use crate::function::Func;
use crate::incremental::view::IncrementalView;
use crate::index_method::{IndexMethodAttachment, IndexMethodConfiguration};
use crate::translate::expr::{bind_and_rewrite_expr, walk_expr, BindingBehavior, WalkControl};
use crate::translate::index::{resolve_index_method_parameters, resolve_sorted_columns};
use crate::translate::planner::ROWID_STRS;
use crate::vdbe::affinity::Affinity;
use parking_lot::RwLock;
use turso_macros::AtomicEnum;

#[derive(Debug, Clone, AtomicEnum)]
pub enum ViewState {
    Ready,
    InProgress,
}

/// Simple view structure for non-materialized views
#[derive(Debug)]
pub struct View {
    pub name: String,
    pub sql: String,
    pub select_stmt: ast::Select,
    pub columns: Vec<Column>,
    pub state: AtomicViewState,
}

impl View {
    fn new(name: String, sql: String, select_stmt: ast::Select, columns: Vec<Column>) -> Self {
        Self {
            name,
            sql,
            select_stmt,
            columns,
            state: AtomicViewState::new(ViewState::Ready),
        }
    }

    pub fn process(&self) -> Result<()> {
        let state = self.state.get();
        match state {
            ViewState::InProgress => {
                bail_parse_error!("view {} is circularly defined", self.name)
            }
            ViewState::Ready => {
                self.state.set(ViewState::InProgress);
                Ok(())
            }
        }
    }

    pub fn done(&self) {
        let state = self.state.get();
        match state {
            ViewState::InProgress => {
                self.state.set(ViewState::Ready);
            }
            ViewState::Ready => {}
        }
    }
}

impl Clone for View {
    fn clone(&self) -> Self {
        Self {
            name: self.name.clone(),
            sql: self.sql.clone(),
            select_stmt: self.select_stmt.clone(),
            columns: self.columns.clone(),
            state: AtomicViewState::new(ViewState::Ready),
        }
    }
}

/// Type alias for regular views collection
pub type ViewsMap = HashMap<String, Arc<View>>;

/// Trigger structure
#[derive(Debug, Clone)]
pub struct Trigger {
    pub name: String,
    pub sql: String,
    pub table_name: String,
    pub time: turso_parser::ast::TriggerTime,
    pub event: turso_parser::ast::TriggerEvent,
    pub for_each_row: bool,
    pub when_clause: Option<turso_parser::ast::Expr>,
    pub commands: Vec<turso_parser::ast::TriggerCmd>,
    pub temporary: bool,
}

impl Trigger {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        name: String,
        sql: String,
        table_name: String,
        time: Option<turso_parser::ast::TriggerTime>,
        event: turso_parser::ast::TriggerEvent,
        for_each_row: bool,
        when_clause: Option<turso_parser::ast::Expr>,
        commands: Vec<turso_parser::ast::TriggerCmd>,
        temporary: bool,
    ) -> Self {
        Self {
            name,
            sql,
            table_name,
            time: time.unwrap_or(turso_parser::ast::TriggerTime::Before),
            event,
            for_each_row,
            when_clause,
            commands,
            temporary,
        }
    }
}

use crate::storage::btree::{BTreeCursor, CursorTrait};
use crate::translate::collate::CollationSeq;
use crate::translate::plan::{SelectPlan, TableReferences};
use crate::util::{
    module_args_from_sql, module_name_from_sql, type_from_name, IOExt, UnparsedFromSqlIndex,
};
use crate::{
    bail_parse_error, contains_ignore_ascii_case, eq_ignore_ascii_case, match_ignore_ascii_case,
    Connection, LimboError, MvCursor, MvStore, Pager, SymbolTable, ValueRef, VirtualTable,
};
use crate::{util::normalize_ident, Result};
use core::fmt;
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::Deref;
use std::sync::Arc;
use tracing::trace;
use turso_parser::ast::{
    self, ColumnDefinition, Expr, InitDeferredPred, Literal, RefAct, SortOrder, TableOptions,
};
use turso_parser::{
    ast::{Cmd, CreateTableBody, ResultColumn, Stmt},
    parser::Parser,
};

const SCHEMA_TABLE_NAME: &str = "sqlite_schema";
const SCHEMA_TABLE_NAME_ALT: &str = "sqlite_master";
pub const DBSP_TABLE_PREFIX: &str = "__turso_internal_dbsp_state_v";

/// Used to refer to the implicit rowid column in tables without an alias during UPDATE
pub const ROWID_SENTINEL: usize = usize::MAX;

/// Internal table prefixes that should be protected from CREATE/DROP
pub const RESERVED_TABLE_PREFIXES: [&str; 2] = ["sqlite_", "__turso_internal_"];

/// Check if a table name refers to a system table that should be protected from direct writes
pub fn is_system_table(table_name: &str) -> bool {
    let normalized = table_name.to_lowercase();
    normalized == SCHEMA_TABLE_NAME
        || normalized == SCHEMA_TABLE_NAME_ALT
        || table_name.starts_with(DBSP_TABLE_PREFIX)
}

#[derive(Debug)]
pub struct Schema {
    pub tables: HashMap<String, Arc<Table>>,

    /// Track which tables are actually materialized views
    pub materialized_view_names: HashSet<String>,
    /// Store original SQL for materialized views (for .schema command)
    pub materialized_view_sql: HashMap<String, String>,
    /// The incremental view objects (DBSP circuits)
    pub incremental_views: HashMap<String, Arc<Mutex<IncrementalView>>>,

    pub views: ViewsMap,

    /// table_name to list of triggers
    pub triggers: HashMap<String, VecDeque<Arc<Trigger>>>,

    /// table_name to list of indexes for the table
    pub indexes: HashMap<String, VecDeque<Arc<Index>>>,
    pub has_indexes: std::collections::HashSet<String>,
    pub indexes_enabled: bool,
    pub schema_version: u32,

    /// Mapping from table names to the materialized views that depend on them
    pub table_to_materialized_views: HashMap<String, Vec<String>>,

    /// Track views that exist but have incompatible versions
    pub incompatible_views: HashSet<String>,
}

impl Schema {
    pub fn new(indexes_enabled: bool) -> Self {
        let mut tables: HashMap<String, Arc<Table>> = HashMap::new();
        let has_indexes = std::collections::HashSet::new();
        let indexes: HashMap<String, VecDeque<Arc<Index>>> = HashMap::new();
        #[allow(clippy::arc_with_non_send_sync)]
        tables.insert(
            SCHEMA_TABLE_NAME.to_string(),
            Arc::new(Table::BTree(sqlite_schema_table().into())),
        );
        for function in VirtualTable::builtin_functions() {
            tables.insert(
                function.name.to_owned(),
                Arc::new(Table::Virtual(Arc::new((*function).clone()))),
            );
        }
        let materialized_view_names = HashSet::new();
        let materialized_view_sql = HashMap::new();
        let incremental_views = HashMap::new();
        let views: ViewsMap = HashMap::new();
        let triggers = HashMap::new();
        let table_to_materialized_views: HashMap<String, Vec<String>> = HashMap::new();
        let incompatible_views = HashSet::new();
        Self {
            tables,
            materialized_view_names,
            materialized_view_sql,
            incremental_views,
            views,
            triggers,
            indexes,
            has_indexes,
            indexes_enabled,
            schema_version: 0,
            table_to_materialized_views,
            incompatible_views,
        }
    }

    pub fn is_unique_idx_name(&self, name: &str) -> bool {
        !self
            .indexes
            .iter()
            .any(|idx| idx.1.iter().any(|i| i.name == name))
    }
    pub fn add_materialized_view(&mut self, view: IncrementalView, table: Arc<Table>, sql: String) {
        let name = normalize_ident(view.name());

        // Add to tables (so it appears as a regular table)
        self.tables.insert(name.clone(), table);

        // Track that this is a materialized view
        self.materialized_view_names.insert(name.clone());
        self.materialized_view_sql.insert(name.clone(), sql);

        // Store the incremental view (DBSP circuit)
        self.incremental_views
            .insert(name, Arc::new(Mutex::new(view)));
    }

    pub fn get_materialized_view(&self, name: &str) -> Option<Arc<Mutex<IncrementalView>>> {
        let name = normalize_ident(name);
        self.incremental_views.get(&name).cloned()
    }

    /// Check if DBSP state table exists with the current version
    pub fn has_compatible_dbsp_state_table(&self, view_name: &str) -> bool {
        use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
        let view_name = normalize_ident(view_name);
        let expected_table_name = format!("{DBSP_TABLE_PREFIX}{DBSP_CIRCUIT_VERSION}_{view_name}");

        // Check if a table with the expected versioned name exists
        self.tables.contains_key(&expected_table_name)
    }

    pub fn is_materialized_view(&self, name: &str) -> bool {
        let name = normalize_ident(name);
        self.materialized_view_names.contains(&name)
    }

    /// Check if a table has any incompatible dependent materialized views
    pub fn has_incompatible_dependent_views(&self, table_name: &str) -> Vec<String> {
        let table_name = normalize_ident(table_name);

        // Get all materialized views that depend on this table
        let dependent_views = self
            .table_to_materialized_views
            .get(&table_name)
            .cloned()
            .unwrap_or_default();

        // Filter to only incompatible views
        dependent_views
            .into_iter()
            .filter(|view_name| self.incompatible_views.contains(view_name))
            .collect()
    }

    pub fn remove_view(&mut self, name: &str) -> Result<()> {
        let name = normalize_ident(name);

        if self.views.contains_key(&name) {
            self.views.remove(&name);
            Ok(())
        } else if self.materialized_view_names.contains(&name) {
            // Remove from tables
            self.tables.remove(&name);

            // Remove DBSP state table and its indexes from in-memory schema
            use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
            let dbsp_table_name = format!("{DBSP_TABLE_PREFIX}{DBSP_CIRCUIT_VERSION}_{name}");
            self.tables.remove(&dbsp_table_name);
            self.remove_indices_for_table(&dbsp_table_name);

            // Remove from materialized view tracking
            self.materialized_view_names.remove(&name);
            self.materialized_view_sql.remove(&name);
            self.incremental_views.remove(&name);

            // Remove from table_to_materialized_views dependencies
            for views in self.table_to_materialized_views.values_mut() {
                views.retain(|v| v != &name);
            }

            Ok(())
        } else {
            Err(crate::LimboError::ParseError(format!(
                "no such view: {name}"
            )))
        }
    }

    /// Register that a materialized view depends on a table
    pub fn add_materialized_view_dependency(&mut self, table_name: &str, view_name: &str) {
        let table_name = normalize_ident(table_name);
        let view_name = normalize_ident(view_name);

        self.table_to_materialized_views
            .entry(table_name)
            .or_default()
            .push(view_name);
    }

    /// Get all materialized views that depend on a given table
    pub fn get_dependent_materialized_views(&self, table_name: &str) -> Vec<String> {
        if self.table_to_materialized_views.is_empty() {
            return Vec::new();
        }
        let table_name = normalize_ident(table_name);
        self.table_to_materialized_views
            .get(&table_name)
            .cloned()
            .unwrap_or_default()
    }

    /// Add a regular (non-materialized) view
    pub fn add_view(&mut self, view: View) -> Result<()> {
        self.check_object_name_conflict(&view.name)?;
        let name = normalize_ident(&view.name);
        self.views.insert(name, Arc::new(view));
        Ok(())
    }

    /// Get a regular view by name
    pub fn get_view(&self, name: &str) -> Option<Arc<View>> {
        let name = normalize_ident(name);
        self.views.get(&name).cloned()
    }

    pub fn add_trigger(&mut self, trigger: Trigger, table_name: &str) -> Result<()> {
        self.check_object_name_conflict(&trigger.name)?;
        let table_name = normalize_ident(table_name);

        // See [Schema::add_index] for why we push to the front of the deque.
        self.triggers
            .entry(table_name)
            .or_default()
            .push_front(Arc::new(trigger));

        Ok(())
    }

    pub fn remove_trigger(&mut self, name: &str) -> Result<()> {
        let name = normalize_ident(name);

        let mut removed = false;
        for triggers_list in self.triggers.values_mut() {
            for i in 0..triggers_list.len() {
                let trigger = &triggers_list[i];
                if normalize_ident(&trigger.name) == name {
                    removed = true;
                    triggers_list.remove(i);
                    break;
                }
            }
            if removed {
                break;
            }
        }
        if !removed {
            return Err(crate::LimboError::ParseError(format!(
                "no such trigger: {name}"
            )));
        }
        Ok(())
    }

    pub fn get_trigger_for_table(&self, table_name: &str, name: &str) -> Option<Arc<Trigger>> {
        let table_name = normalize_ident(table_name);
        let name = normalize_ident(name);
        self.triggers
            .get(&table_name)
            .and_then(|triggers| triggers.iter().find(|t| t.name == name).cloned())
    }

    pub fn get_triggers_for_table(
        &self,
        table_name: &str,
    ) -> impl Iterator<Item = &Arc<Trigger>> + Clone {
        let table_name = normalize_ident(table_name);
        self.triggers
            .get(&table_name)
            .map(|triggers| triggers.iter())
            .unwrap_or_default()
    }

    pub fn get_trigger(&self, name: &str) -> Option<Arc<Trigger>> {
        let name = normalize_ident(name);
        self.triggers
            .values()
            .flatten()
            .find(|t| t.name == name)
            .cloned()
    }

    pub fn add_btree_table(&mut self, table: Arc<BTreeTable>) -> Result<()> {
        self.check_object_name_conflict(&table.name)?;
        let name = normalize_ident(&table.name);
        self.tables.insert(name, Table::BTree(table).into());
        Ok(())
    }

    pub fn add_virtual_table(&mut self, table: Arc<VirtualTable>) -> Result<()> {
        self.check_object_name_conflict(&table.name)?;
        let name = normalize_ident(&table.name);
        self.tables.insert(name, Table::Virtual(table).into());
        Ok(())
    }

    pub fn get_table(&self, name: &str) -> Option<Arc<Table>> {
        let name = normalize_ident(name);
        let name = if name.eq_ignore_ascii_case(SCHEMA_TABLE_NAME_ALT) {
            SCHEMA_TABLE_NAME
        } else {
            &name
        };
        self.tables.get(name).cloned()
    }

    pub fn remove_table(&mut self, table_name: &str) {
        let name = normalize_ident(table_name);
        self.tables.remove(&name);

        // If this was a materialized view, also clean up the metadata
        if self.materialized_view_names.remove(&name) {
            self.incremental_views.remove(&name);
            self.materialized_view_sql.remove(&name);
        }
    }

    pub fn get_btree_table(&self, name: &str) -> Option<Arc<BTreeTable>> {
        let name = normalize_ident(name);
        if let Some(table) = self.tables.get(&name) {
            table.btree()
        } else {
            None
        }
    }

    pub fn add_index(&mut self, index: Arc<Index>) -> Result<()> {
        self.check_object_name_conflict(&index.name)?;
        let table_name = normalize_ident(&index.table_name);
        // We must add the new index to the front of the deque, because SQLite stores index definitions as a linked list
        // where the newest parsed index entry is at the head of list. If we would add it to the back of a regular Vec for example,
        // then we would evaluate ON CONFLICT DO UPDATE clauses in the wrong index iteration order and UPDATE the wrong row. One might
        // argue that this is an implementation detail and we should not care about this, but it makes e.g. the fuzz test 'partial_index_mutation_and_upsert_fuzz'
        // fail, so let's just be compatible.
        self.indexes
            .entry(table_name)
            .or_default()
            .push_front(index.clone());
        Ok(())
    }

    pub fn get_indices(&self, table_name: &str) -> impl Iterator<Item = &Arc<Index>> {
        let name = normalize_ident(table_name);
        self.indexes
            .get(&name)
            .map(|v| v.iter())
            .unwrap_or_default()
            .filter(|i| !i.is_backing_btree_index())
    }

    pub fn get_index(&self, table_name: &str, index_name: &str) -> Option<&Arc<Index>> {
        let name = normalize_ident(table_name);
        self.indexes
            .get(&name)?
            .iter()
            .find(|index| index.name == index_name)
    }

    pub fn remove_indices_for_table(&mut self, table_name: &str) {
        let name = normalize_ident(table_name);
        self.indexes.remove(&name);
    }

    pub fn remove_index(&mut self, idx: &Index) {
        let name = normalize_ident(&idx.table_name);
        self.indexes
            .get_mut(&name)
            .expect("Must have the index")
            .retain_mut(|other_idx| other_idx.name != idx.name);
    }

    pub fn table_has_indexes(&self, table_name: &str) -> bool {
        let name = normalize_ident(table_name);
        self.has_indexes.contains(&name)
    }

    pub fn table_set_has_index(&mut self, table_name: &str) {
        self.has_indexes.insert(table_name.to_string());
    }

    pub fn indexes_enabled(&self) -> bool {
        self.indexes_enabled
    }

    /// Update [Schema] by scanning the first root page (sqlite_schema)
    pub fn make_from_btree(
        &mut self,
        mv_cursor: Option<Arc<RwLock<MvCursor>>>,
        pager: Arc<Pager>,
        syms: &SymbolTable,
    ) -> Result<()> {
        assert!(
            mv_cursor.is_none(),
            "mvcc not yet supported for make_from_btree"
        );
        let mut cursor = BTreeCursor::new_table(Arc::clone(&pager), 1, 10);

        let mut from_sql_indexes = Vec::with_capacity(10);
        let mut automatic_indices: HashMap<String, Vec<(String, i64)>> = HashMap::with_capacity(10);

        // Store DBSP state table root pages: view_name -> dbsp_state_root_page
        let mut dbsp_state_roots: HashMap<String, i64> = HashMap::new();
        // Store DBSP state table index root pages: view_name -> dbsp_state_index_root_page
        let mut dbsp_state_index_roots: HashMap<String, i64> = HashMap::new();
        // Store materialized view info (SQL and root page) for later creation
        let mut materialized_view_info: HashMap<String, (String, i64)> = HashMap::new();

        pager.begin_read_tx()?;

        pager.io.block(|| cursor.rewind())?;

        loop {
            let Some(row) = pager.io.block(|| cursor.record())? else {
                break;
            };

            let mut record_cursor = cursor.record_cursor.borrow_mut();
            // sqlite schema table has 5 columns: type, name, tbl_name, rootpage, sql
            let ty_value = record_cursor.get_value(&row, 0)?;
            let ValueRef::Text(ty) = ty_value else {
                return Err(LimboError::ConversionError("Expected text value".into()));
            };
            let ValueRef::Text(name) = record_cursor.get_value(&row, 1)? else {
                return Err(LimboError::ConversionError("Expected text value".into()));
            };
            let table_name_value = record_cursor.get_value(&row, 2)?;
            let ValueRef::Text(table_name) = table_name_value else {
                return Err(LimboError::ConversionError("Expected text value".into()));
            };
            let root_page_value = record_cursor.get_value(&row, 3)?;
            let ValueRef::Integer(root_page) = root_page_value else {
                return Err(LimboError::ConversionError("Expected integer value".into()));
            };
            let sql_value = record_cursor.get_value(&row, 4)?;
            let sql_textref = match sql_value {
                ValueRef::Text(sql) => Some(sql),
                _ => None,
            };
            let sql = sql_textref.map(|s| s.as_str());

            self.handle_schema_row(
                &ty,
                &name,
                &table_name,
                root_page,
                sql,
                syms,
                &mut from_sql_indexes,
                &mut automatic_indices,
                &mut dbsp_state_roots,
                &mut dbsp_state_index_roots,
                &mut materialized_view_info,
                None,
            )?;
            drop(record_cursor);
            drop(row);

            pager.io.block(|| cursor.next())?;
        }

        pager.end_read_tx();

        self.populate_indices(syms, from_sql_indexes, automatic_indices)?;

        self.populate_materialized_views(
            materialized_view_info,
            dbsp_state_roots,
            dbsp_state_index_roots,
        )?;

        Ok(())
    }

    /// Populate indices parsed from the schema.
    /// from_sql_indexes: indices explicitly created with CREATE INDEX
    /// automatic_indices: indices created automatically for primary key and unique constraints
    pub fn populate_indices(
        &mut self,
        syms: &SymbolTable,
        from_sql_indexes: Vec<UnparsedFromSqlIndex>,
        automatic_indices: std::collections::HashMap<String, Vec<(String, i64)>>,
    ) -> Result<()> {
        for unparsed_sql_from_index in from_sql_indexes {
            if !self.indexes_enabled() {
                self.table_set_has_index(&unparsed_sql_from_index.table_name);
            } else {
                let table = self
                    .get_btree_table(&unparsed_sql_from_index.table_name)
                    .unwrap();
                let index = Index::from_sql(
                    syms,
                    &unparsed_sql_from_index.sql,
                    unparsed_sql_from_index.root_page,
                    table.as_ref(),
                )?;
                self.add_index(Arc::new(index))?;
            }
        }

        for automatic_index in automatic_indices {
            if !self.indexes_enabled() {
                self.table_set_has_index(&automatic_index.0);
                continue;
            }
            // Autoindexes must be parsed in definition order.
            // The SQL statement parser enforces that the column definitions come first, and compounds are defined after that,
            // e.g. CREATE TABLE t (a, b, UNIQUE(a, b)), and you can't do something like CREATE TABLE t (a, b, UNIQUE(a, b), c);
            // Hence, we can process the singles first (unique_set.columns.len() == 1), and then the compounds (unique_set.columns.len() > 1).
            let table = self.get_btree_table(&automatic_index.0).unwrap();
            let mut automatic_indexes = automatic_index.1;
            automatic_indexes.reverse(); // reverse so we can pop() without shifting array elements, while still processing in left-to-right order
            let mut pk_index_added = false;
            for unique_set in table.unique_sets.iter().filter(|us| us.columns.len() == 1) {
                let col_name = &unique_set.columns.first().unwrap().0;
                let Some((pos_in_table, column)) = table.get_column(col_name) else {
                    return Err(LimboError::ParseError(format!(
                        "Column {col_name} not found in table {}",
                        table.name
                    )));
                };
                if column.primary_key() && unique_set.is_primary_key {
                    if column.is_rowid_alias() {
                        // rowid alias, no index needed
                        continue;
                    }
                    assert!(table.primary_key_columns.first().unwrap().0 == *col_name, "trying to add a primary key index for column that is not the first column in the primary key: {} != {}", table.primary_key_columns.first().unwrap().0, col_name);
                    // Add single column primary key index
                    assert!(
                        !pk_index_added,
                        "trying to add a second primary key index for table {}",
                        table.name
                    );
                    pk_index_added = true;
                    self.add_index(Arc::new(Index::automatic_from_primary_key(
                        table.as_ref(),
                        automatic_indexes.pop().unwrap(),
                        1,
                    )?))?;
                } else {
                    // Add single column unique index
                    if let Some(autoidx) = automatic_indexes.pop() {
                        self.add_index(Arc::new(Index::automatic_from_unique(
                            table.as_ref(),
                            autoidx,
                            vec![(pos_in_table, unique_set.columns.first().unwrap().1)],
                        )?))?;
                    }
                }
            }
            for unique_set in table.unique_sets.iter().filter(|us| us.columns.len() > 1) {
                if unique_set.is_primary_key {
                    assert!(table.primary_key_columns.len() == unique_set.columns.len(), "trying to add a {}-column primary key index for table {}, but the table has {} primary key columns", unique_set.columns.len(), table.name, table.primary_key_columns.len());
                    // Add composite primary key index
                    assert!(
                        !pk_index_added,
                        "trying to add a second primary key index for table {}",
                        table.name
                    );
                    pk_index_added = true;
                    self.add_index(Arc::new(Index::automatic_from_primary_key(
                        table.as_ref(),
                        automatic_indexes.pop().unwrap(),
                        unique_set.columns.len(),
                    )?))?;
                } else {
                    // Add composite unique index
                    let mut column_indices_and_sort_orders =
                        Vec::with_capacity(unique_set.columns.len());
                    for (col_name, sort_order) in unique_set.columns.iter() {
                        let Some((pos_in_table, _)) = table.get_column(col_name) else {
                            return Err(crate::LimboError::ParseError(format!(
                                "Column {} not found in table {}",
                                col_name, table.name
                            )));
                        };
                        column_indices_and_sort_orders.push((pos_in_table, *sort_order));
                    }
                    self.add_index(Arc::new(Index::automatic_from_unique(
                        table.as_ref(),
                        automatic_indexes.pop().unwrap(),
                        column_indices_and_sort_orders,
                    )?))?;
                }
            }

            assert!(automatic_indexes.is_empty(), "all automatic indexes parsed from sqlite_schema should have been consumed, but {} remain", automatic_indexes.len());
        }
        Ok(())
    }

    /// Populate materialized views parsed from the schema.
    pub fn populate_materialized_views(
        &mut self,
        materialized_view_info: std::collections::HashMap<String, (String, i64)>,
        dbsp_state_roots: std::collections::HashMap<String, i64>,
        dbsp_state_index_roots: std::collections::HashMap<String, i64>,
    ) -> Result<()> {
        for (view_name, (sql, main_root)) in materialized_view_info {
            // Look up the DBSP state root for this view
            // If missing, it means version mismatch - skip this view
            // Check if we have a compatible DBSP state root
            let dbsp_state_root = if let Some(&root) = dbsp_state_roots.get(&view_name) {
                root
            } else {
                tracing::warn!(
                    "Materialized view '{}' has incompatible version or missing DBSP state table",
                    view_name
                );
                // Track this as an incompatible view
                self.incompatible_views.insert(view_name.clone());
                // Use a dummy root page - the view won't be usable anyway
                0
            };

            // Look up the DBSP state index root (may not exist for older schemas)
            let dbsp_state_index_root =
                dbsp_state_index_roots.get(&view_name).copied().unwrap_or(0);
            // Create the IncrementalView with all root pages
            let incremental_view = IncrementalView::from_sql(
                &sql,
                self,
                main_root,
                dbsp_state_root,
                dbsp_state_index_root,
            )?;
            let referenced_tables = incremental_view.get_referenced_table_names();

            // Create a BTreeTable for the materialized view
            let table = Arc::new(Table::BTree(Arc::new(BTreeTable {
                name: view_name.clone(),
                root_page: main_root,
                columns: incremental_view.column_schema.flat_columns(),
                primary_key_columns: Vec::new(),
                has_rowid: true,
                is_strict: false,
                has_autoincrement: false,
                foreign_keys: vec![],

                unique_sets: vec![],
            })));

            // Only add to schema if compatible
            if !self.incompatible_views.contains(&view_name) {
                self.add_materialized_view(incremental_view, table, sql);
            }

            // Register dependencies regardless of compatibility
            for table_name in referenced_tables {
                self.add_materialized_view_dependency(&table_name, &view_name);
            }
        }
        Ok(())
    }

    #[allow(clippy::too_many_arguments)]
    pub fn handle_schema_row(
        &mut self,
        ty: &str,
        name: &str,
        table_name: &str,
        root_page: i64,
        maybe_sql: Option<&str>,
        syms: &SymbolTable,
        from_sql_indexes: &mut Vec<UnparsedFromSqlIndex>,
        automatic_indices: &mut std::collections::HashMap<String, Vec<(String, i64)>>,
        dbsp_state_roots: &mut std::collections::HashMap<String, i64>,
        dbsp_state_index_roots: &mut std::collections::HashMap<String, i64>,
        materialized_view_info: &mut std::collections::HashMap<String, (String, i64)>,
        mv_store: Option<&Arc<MvStore>>,
    ) -> Result<()> {
        match ty {
            "table" => {
                let sql = maybe_sql.expect("sql should be present for table");
                let sql_bytes = sql.as_bytes();
                if root_page == 0 && contains_ignore_ascii_case!(sql_bytes, b"create virtual") {
                    // a virtual table is found in the sqlite_schema, but it's no
                    // longer in the in-memory schema. We need to recreate it if
                    // the module is loaded in the symbol table.
                    let vtab = if let Some(vtab) = syms.vtabs.get(name) {
                        vtab.clone()
                    } else {
                        let mod_name = module_name_from_sql(sql)?;
                        crate::VirtualTable::table(
                            Some(name),
                            mod_name,
                            module_args_from_sql(sql)?,
                            syms,
                        )?
                    };
                    self.add_virtual_table(vtab)?;
                } else {
                    let table = BTreeTable::from_sql(sql, root_page)?;

                    // Check if this is a DBSP state table
                    if table.name.starts_with(DBSP_TABLE_PREFIX) {
                        // Extract version and view name from __turso_internal_dbsp_state_v<version>_<viewname>
                        let suffix = table.name.strip_prefix(DBSP_TABLE_PREFIX).unwrap();

                        // Parse version and view name (format: "<version>_<viewname>")
                        if let Some(underscore_pos) = suffix.find('_') {
                            let version_str = &suffix[..underscore_pos];
                            let view_name = &suffix[underscore_pos + 1..];

                            // Check version compatibility
                            if let Ok(stored_version) = version_str.parse::<u32>() {
                                use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
                                if stored_version == DBSP_CIRCUIT_VERSION {
                                    // Version matches, store the root page
                                    dbsp_state_roots.insert(view_name.to_string(), root_page);
                                } else {
                                    // Version mismatch - DO NOT insert into dbsp_state_roots
                                    // This will cause populate_materialized_views to skip this view
                                    tracing::warn!(
                                        "Skipping materialized view '{}' - has version {} but current version is {}. DROP and recreate the view to use it.",
                                        view_name, stored_version, DBSP_CIRCUIT_VERSION
                                    );
                                    // We can't track incompatible views here since we're in handle_schema_row
                                    // which doesn't have mutable access to self
                                }
                            }
                        }
                    }

                    self.add_btree_table(Arc::new(table))?;
                }
            }
            "index" => {
                assert!(mv_store.is_none(), "indexes not yet supported for mvcc");
                match maybe_sql {
                    Some(sql) => {
                        from_sql_indexes.push(UnparsedFromSqlIndex {
                            table_name: table_name.to_string(),
                            root_page,
                            sql: sql.to_string(),
                        });
                    }
                    None => {
                        // Automatic index on primary key and/or unique constraint, e.g.
                        // table|foo|foo|2|CREATE TABLE foo (a text PRIMARY KEY, b)
                        // index|sqlite_autoindex_foo_1|foo|3|
                        let index_name = name.to_string();
                        let table_name = table_name.to_string();

                        // Check if this is an index for a DBSP state table
                        if table_name.starts_with(DBSP_TABLE_PREFIX) {
                            // Extract version and view name from __turso_internal_dbsp_state_v<version>_<viewname>
                            let suffix = table_name.strip_prefix(DBSP_TABLE_PREFIX).unwrap();

                            // Parse version and view name (format: "<version>_<viewname>")
                            if let Some(underscore_pos) = suffix.find('_') {
                                let version_str = &suffix[..underscore_pos];
                                let view_name = &suffix[underscore_pos + 1..];

                                // Only store index root if version matches
                                if let Ok(stored_version) = version_str.parse::<u32>() {
                                    use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
                                    if stored_version == DBSP_CIRCUIT_VERSION {
                                        dbsp_state_index_roots
                                            .insert(view_name.to_string(), root_page);
                                    }
                                }
                            }
                        } else {
                            match automatic_indices.entry(table_name) {
                                std::collections::hash_map::Entry::Vacant(e) => {
                                    e.insert(vec![(index_name, root_page)]);
                                }
                                std::collections::hash_map::Entry::Occupied(mut e) => {
                                    e.get_mut().push((index_name, root_page));
                                }
                            }
                        }
                    }
                }
            }
            "view" => {
                use crate::schema::View;
                use turso_parser::ast::{Cmd, Stmt};
                use turso_parser::parser::Parser;

                let sql = maybe_sql.expect("sql should be present for view");
                let view_name = name.to_string();
                assert!(mv_store.is_none(), "views not yet supported for mvcc");

                // Parse the SQL to determine if it's a regular or materialized view
                let mut parser = Parser::new(sql.as_bytes());
                if let Ok(Some(Cmd::Stmt(stmt))) = parser.next_cmd() {
                    match stmt {
                        Stmt::CreateMaterializedView { .. } => {
                            // Store materialized view info for later creation
                            // We'll handle reuse logic and create the actual IncrementalView
                            // in a later pass when we have both the main root page and DBSP state root
                            materialized_view_info
                                .insert(view_name.clone(), (sql.to_string(), root_page));

                            // Mark the existing view for potential reuse
                            if self.incremental_views.contains_key(&view_name) {
                                // We'll check for reuse in the third pass
                            }
                        }
                        Stmt::CreateView {
                            view_name: _,
                            columns: column_names,
                            select,
                            ..
                        } => {
                            // Extract actual columns from the SELECT statement
                            let view_column_schema =
                                crate::util::extract_view_columns(&select, self)?;

                            // If column names were provided in CREATE VIEW (col1, col2, ...),
                            // use them to rename the columns
                            let mut final_columns = view_column_schema.flat_columns();
                            for (i, indexed_col) in column_names.iter().enumerate() {
                                if let Some(col) = final_columns.get_mut(i) {
                                    col.name = Some(indexed_col.col_name.to_string());
                                }
                            }

                            // Create regular view
                            let view =
                                View::new(name.to_string(), sql.to_string(), select, final_columns);
                            self.add_view(view)?;
                        }
                        _ => {}
                    }
                }
            }
            "trigger" => {
                use turso_parser::ast::{Cmd, Stmt};
                use turso_parser::parser::Parser;

                let sql = maybe_sql.expect("sql should be present for trigger");
                let trigger_name = name.to_string();

                let mut parser = Parser::new(sql.as_bytes());
                let Ok(Some(Cmd::Stmt(Stmt::CreateTrigger {
                    temporary,
                    if_not_exists: _,
                    trigger_name: _,
                    time,
                    event,
                    tbl_name,
                    for_each_row,
                    when_clause,
                    commands,
                }))) = parser.next_cmd()
                else {
                    return Err(crate::LimboError::ParseError(format!(
                        "invalid trigger sql: {sql}"
                    )));
                };
                self.add_trigger(
                    Trigger::new(
                        trigger_name.clone(),
                        sql.to_string(),
                        tbl_name.name.to_string(),
                        time,
                        event,
                        for_each_row,
                        when_clause.map(|e| *e),
                        commands,
                        temporary,
                    ),
                    tbl_name.name.as_str(),
                )?;
            }
            _ => {}
        };

        Ok(())
    }

    /// Compute all resolved FKs *referencing* `table_name` (arg: `table_name` is the parent).
    /// Each item contains the child table, normalized columns/positions, and the parent lookup
    /// strategy (rowid vs. UNIQUE index or PK).
    pub fn resolved_fks_referencing(&self, table_name: &str) -> Result<Vec<ResolvedFkRef>> {
        let fk_mismatch_err = |child: &str, parent: &str| -> crate::LimboError {
            crate::LimboError::Constraint(format!(
                "foreign key mismatch - \"{child}\" referencing \"{parent}\""
            ))
        };
        let target = normalize_ident(table_name);
        let mut out = Vec::with_capacity(4); // arbitrary estimate
        let parent_tbl = self
            .get_btree_table(&target)
            .ok_or_else(|| fk_mismatch_err("<unknown>", &target))?;

        // Precompute helper to find parent unique index, if it's not the rowid
        let find_parent_unique = |cols: &Vec<String>| -> Option<Arc<Index>> {
            self.get_indices(&parent_tbl.name)
                .find(|idx| {
                    idx.unique
                        && idx.columns.len() == cols.len()
                        && idx
                            .columns
                            .iter()
                            .zip(cols.iter())
                            .all(|(ic, pc)| ic.name.eq_ignore_ascii_case(pc))
                })
                .cloned()
        };

        for t in self.tables.values() {
            let Some(child) = t.btree() else {
                continue;
            };
            for fk in &child.foreign_keys {
                if !fk.parent_table.eq_ignore_ascii_case(&target) {
                    continue;
                }
                if fk.child_columns.is_empty() {
                    // SQLite requires an explicit child column list unless the table has a single-column PK that
                    return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
                }
                let child_cols: Vec<String> = fk.child_columns.clone();
                let mut child_pos = Vec::with_capacity(child_cols.len());

                for cname in &child_cols {
                    let (i, _) = child
                        .get_column(cname)
                        .ok_or_else(|| fk_mismatch_err(&child.name, &parent_tbl.name))?;
                    child_pos.push(i);
                }
                let parent_cols: Vec<String> = if fk.parent_columns.is_empty() {
                    if !parent_tbl.primary_key_columns.is_empty() {
                        parent_tbl
                            .primary_key_columns
                            .iter()
                            .map(|(col, _)| col)
                            .cloned()
                            .collect()
                    } else {
                        return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
                    }
                } else {
                    fk.parent_columns.clone()
                };

                // Same length required
                if parent_cols.len() != child_cols.len() {
                    return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
                }

                let mut parent_pos = Vec::with_capacity(parent_cols.len());
                for pc in &parent_cols {
                    let pos = parent_tbl.get_column(pc).map(|(i, _)| i).or_else(|| {
                        ROWID_STRS
                            .iter()
                            .any(|s| pc.eq_ignore_ascii_case(s))
                            .then_some(0)
                    });
                    let Some(p) = pos else {
                        return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
                    };
                    parent_pos.push(p);
                }

                // Determine if parent key is ROWID/alias
                let parent_uses_rowid = parent_tbl.primary_key_columns.len().eq(&1) && {
                    if parent_tbl.primary_key_columns.len() == 1 {
                        let pk_name = &parent_tbl.primary_key_columns[0].0;
                        // rowid or alias INTEGER PRIMARY KEY; either is ok implicitly
                        parent_tbl.columns.iter().any(|c| {
                            c.is_rowid_alias()
                                && c.name
                                    .as_deref()
                                    .is_some_and(|n| n.eq_ignore_ascii_case(pk_name))
                        }) || ROWID_STRS.iter().any(|&r| r.eq_ignore_ascii_case(pk_name))
                    } else {
                        false
                    }
                };

                // If not rowid, there must be a non-partial UNIQUE exactly on parent_cols
                let parent_unique_index = if parent_uses_rowid {
                    None
                } else {
                    find_parent_unique(&parent_cols)
                };
                fk.validate()?;
                out.push(ResolvedFkRef {
                    child_table: Arc::clone(&child),
                    fk: Arc::clone(fk),
                    child_cols,
                    child_pos,
                    parent_pos,
                    parent_uses_rowid,
                    parent_unique_index,
                });
            }
        }
        Ok(out)
    }

    /// Compute all resolved FKs *declared by* `child_table`
    pub fn resolved_fks_for_child(&self, child_table: &str) -> crate::Result<Vec<ResolvedFkRef>> {
        let fk_mismatch_err = |child: &str, parent: &str| -> crate::LimboError {
            crate::LimboError::Constraint(format!(
                "foreign key mismatch - \"{child}\" referencing \"{parent}\""
            ))
        };
        let child_name = normalize_ident(child_table);
        let child = self
            .get_btree_table(&child_name)
            .ok_or_else(|| fk_mismatch_err(&child_name, "<unknown>"))?;

        let mut out = Vec::with_capacity(child.foreign_keys.len());

        for fk in &child.foreign_keys {
            let parent_name = normalize_ident(&fk.parent_table);
            let parent_tbl = self
                .get_btree_table(&parent_name)
                .ok_or_else(|| fk_mismatch_err(&child.name, &parent_name))?;

            let child_cols: Vec<String> = fk.child_columns.clone();
            if child_cols.is_empty() {
                return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
            }

            // Child positions exist
            let mut child_pos = Vec::with_capacity(child_cols.len());
            for cname in &child_cols {
                let (i, _) = child
                    .get_column(cname)
                    .ok_or_else(|| fk_mismatch_err(&child.name, &parent_tbl.name))?;
                child_pos.push(i);
            }

            let parent_cols: Vec<String> = if fk.parent_columns.is_empty() {
                if !parent_tbl.primary_key_columns.is_empty() {
                    parent_tbl
                        .primary_key_columns
                        .iter()
                        .map(|(col, _)| col)
                        .cloned()
                        .collect()
                } else {
                    return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
                }
            } else {
                fk.parent_columns.clone()
            };

            if parent_cols.len() != child_cols.len() {
                return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
            }

            // Parent positions exist, or rowid sentinel
            let mut parent_pos = Vec::with_capacity(parent_cols.len());
            for pc in &parent_cols {
                let pos = parent_tbl.get_column(pc).map(|(i, _)| i).or_else(|| {
                    ROWID_STRS
                        .iter()
                        .any(|&r| r.eq_ignore_ascii_case(pc))
                        .then_some(0)
                });
                let Some(p) = pos else {
                    return Err(fk_mismatch_err(&child.name, &parent_tbl.name));
                };
                parent_pos.push(p);
            }

            let parent_uses_rowid = parent_cols.len().eq(&1) && {
                let c = parent_cols[0].as_str();
                ROWID_STRS.iter().any(|&r| r.eq_ignore_ascii_case(c))
                    || parent_tbl.columns.iter().any(|col| {
                        col.is_rowid_alias()
                            && col
                                .name
                                .as_deref()
                                .is_some_and(|n| n.eq_ignore_ascii_case(c))
                    })
            };

            // Must be PK or a non-partial UNIQUE on exactly those columns.
            let parent_unique_index = if parent_uses_rowid {
                None
            } else {
                self.get_indices(&parent_tbl.name)
                    .find(|idx| {
                        idx.unique
                            && idx.where_clause.is_none()
                            && idx.columns.len() == parent_cols.len()
                            && idx
                                .columns
                                .iter()
                                .zip(parent_cols.iter())
                                .all(|(ic, pc)| ic.name.eq_ignore_ascii_case(pc))
                    })
                    .cloned()
                    .ok_or_else(|| fk_mismatch_err(&child.name, &parent_tbl.name))?
                    .into()
            };

            fk.validate()?;
            out.push(ResolvedFkRef {
                child_table: Arc::clone(&child),
                fk: Arc::clone(fk),
                child_cols,
                child_pos,
                parent_pos,
                parent_uses_rowid,
                parent_unique_index,
            });
        }

        Ok(out)
    }

    /// Returns if any table declares a FOREIGN KEY whose parent is `table_name`.
    pub fn any_resolved_fks_referencing(&self, table_name: &str) -> bool {
        self.tables.values().any(|t| {
            let Some(bt) = t.btree() else {
                return false;
            };
            bt.foreign_keys
                .iter()
                .any(|fk| fk.parent_table == table_name)
        })
    }

    /// Returns true if `table_name` declares any FOREIGN KEYs
    pub fn has_child_fks(&self, table_name: &str) -> bool {
        self.get_table(table_name)
            .and_then(|t| t.btree())
            .is_some_and(|t| !t.foreign_keys.is_empty())
    }

    fn check_object_name_conflict(&self, name: &str) -> Result<()> {
        let normalized_name = normalize_ident(name);

        if self.tables.contains_key(&normalized_name) {
            return Err(crate::LimboError::ParseError(
                ["table \"", name, "\" already exists"].concat().to_string(),
            ));
        }

        if self.views.contains_key(&normalized_name) {
            return Err(crate::LimboError::ParseError(
                ["view \"", name, "\" already exists"].concat().to_string(),
            ));
        }

        for index_list in self.indexes.values() {
            if index_list.iter().any(|i| i.name.eq_ignore_ascii_case(name)) {
                return Err(crate::LimboError::ParseError(
                    ["index \"", name, "\" already exists"].concat().to_string(),
                ));
            }
        }

        Ok(())
    }
}

impl Clone for Schema {
    /// Cloning a `Schema` requires deep cloning of all internal tables and indexes, even though they are wrapped in `Arc`.
    /// Simply copying the `Arc` pointers would result in multiple `Schema` instances sharing the same underlying tables and indexes,
    /// which could lead to panics or data races if any instance attempts to modify them.
    /// To ensure each `Schema` is independent and safe to modify, we clone the underlying data for all tables and indexes.
    fn clone(&self) -> Self {
        let tables = self
            .tables
            .iter()
            .map(|(name, table)| match table.deref() {
                Table::BTree(table) => {
                    let table = Arc::deref(table);
                    (
                        name.clone(),
                        Arc::new(Table::BTree(Arc::new(table.clone()))),
                    )
                }
                Table::Virtual(table) => {
                    let table = Arc::deref(table);
                    (
                        name.clone(),
                        Arc::new(Table::Virtual(Arc::new(table.clone()))),
                    )
                }
                Table::FromClauseSubquery(from_clause_subquery) => (
                    name.clone(),
                    Arc::new(Table::FromClauseSubquery(from_clause_subquery.clone())),
                ),
            })
            .collect();
        let indexes = self
            .indexes
            .iter()
            .map(|(name, indexes)| {
                let indexes = indexes
                    .iter()
                    .map(|index| Arc::new((**index).clone()))
                    .collect();
                (name.clone(), indexes)
            })
            .collect();
        let materialized_view_names = self.materialized_view_names.clone();
        let materialized_view_sql = self.materialized_view_sql.clone();
        let incremental_views = self
            .incremental_views
            .iter()
            .map(|(name, view)| (name.clone(), view.clone()))
            .collect();
        let views = self
            .views
            .iter()
            .map(|(name, view)| (name.clone(), Arc::new((**view).clone())))
            .collect();
        let triggers = self
            .triggers
            .iter()
            .map(|(table_name, triggers)| {
                (
                    table_name.clone(),
                    triggers.iter().map(|t| Arc::new((**t).clone())).collect(),
                )
            })
            .collect();
        let incompatible_views = self.incompatible_views.clone();
        Self {
            tables,
            materialized_view_names,
            materialized_view_sql,
            incremental_views,
            views,
            triggers,
            indexes,
            has_indexes: self.has_indexes.clone(),
            indexes_enabled: self.indexes_enabled,
            schema_version: self.schema_version,
            table_to_materialized_views: self.table_to_materialized_views.clone(),
            incompatible_views,
        }
    }
}

#[derive(Clone, Debug)]
pub enum Table {
    BTree(Arc<BTreeTable>),
    Virtual(Arc<VirtualTable>),
    FromClauseSubquery(FromClauseSubquery),
}

impl Table {
    pub fn get_root_page(&self) -> i64 {
        match self {
            Table::BTree(table) => table.root_page,
            Table::Virtual(_) => unimplemented!(),
            Table::FromClauseSubquery(_) => unimplemented!(),
        }
    }

    pub fn get_name(&self) -> &str {
        match self {
            Self::BTree(table) => &table.name,
            Self::Virtual(table) => &table.name,
            Self::FromClauseSubquery(from_clause_subquery) => &from_clause_subquery.name,
        }
    }

    pub fn get_column_at(&self, index: usize) -> Option<&Column> {
        match self {
            Self::BTree(table) => table.columns.get(index),
            Self::Virtual(table) => table.columns.get(index),
            Self::FromClauseSubquery(from_clause_subquery) => {
                from_clause_subquery.columns.get(index)
            }
        }
    }

    /// Returns the column position and column for a given column name.
    pub fn get_column_by_name(&self, name: &str) -> Option<(usize, &Column)> {
        let name = normalize_ident(name);
        match self {
            Self::BTree(table) => table.get_column(name.as_str()),
            Self::Virtual(table) => table
                .columns
                .iter()
                .enumerate()
                .find(|(_, col)| col.name.as_ref() == Some(&name)),
            Self::FromClauseSubquery(from_clause_subquery) => from_clause_subquery
                .columns
                .iter()
                .enumerate()
                .find(|(_, col)| col.name.as_ref() == Some(&name)),
        }
    }

    pub fn columns(&self) -> &Vec<Column> {
        match self {
            Self::BTree(table) => &table.columns,
            Self::Virtual(table) => &table.columns,
            Self::FromClauseSubquery(from_clause_subquery) => &from_clause_subquery.columns,
        }
    }

    pub fn btree(&self) -> Option<Arc<BTreeTable>> {
        match self {
            Self::BTree(table) => Some(table.clone()),
            Self::Virtual(_) => None,
            Self::FromClauseSubquery(_) => None,
        }
    }

    pub fn virtual_table(&self) -> Option<Arc<VirtualTable>> {
        match self {
            Self::Virtual(table) => Some(table.clone()),
            _ => None,
        }
    }
}

impl PartialEq for Table {
    fn eq(&self, other: &Self) -> bool {
        match (self, other) {
            (Self::BTree(a), Self::BTree(b)) => Arc::ptr_eq(a, b),
            (Self::Virtual(a), Self::Virtual(b)) => Arc::ptr_eq(a, b),
            _ => false,
        }
    }
}

#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct UniqueSet {
    pub columns: Vec<(String, SortOrder)>,
    pub is_primary_key: bool,
}

#[derive(Clone, Debug)]
pub struct BTreeTable {
    pub root_page: i64,
    pub name: String,
    pub primary_key_columns: Vec<(String, SortOrder)>,
    pub columns: Vec<Column>,
    pub has_rowid: bool,
    pub is_strict: bool,
    pub has_autoincrement: bool,
    pub unique_sets: Vec<UniqueSet>,
    pub foreign_keys: Vec<Arc<ForeignKey>>,
}

impl BTreeTable {
    pub fn get_rowid_alias_column(&self) -> Option<(usize, &Column)> {
        if self.primary_key_columns.len() == 1 {
            let (idx, col) = self.get_column(&self.primary_key_columns[0].0)?;
            if col.is_rowid_alias() {
                return Some((idx, col));
            }
        }
        None
    }

    /// Returns the column position and column for a given column name.
    /// Returns None if the column name is not found.
    /// E.g. if table is CREATE TABLE t (a, b, c)
    /// then get_column("b") returns (1, &Column { .. })
    pub fn get_column(&self, name: &str) -> Option<(usize, &Column)> {
        let name = normalize_ident(name);

        self.columns
            .iter()
            .enumerate()
            .find(|(_, column)| column.name.as_ref() == Some(&name))
    }

    pub fn from_sql(sql: &str, root_page: i64) -> Result<BTreeTable> {
        let mut parser = Parser::new(sql.as_bytes());
        let cmd = parser.next_cmd()?;
        match cmd {
            Some(Cmd::Stmt(Stmt::CreateTable { tbl_name, body, .. })) => {
                create_table(tbl_name.name.as_str(), &body, root_page)
            }
            _ => unreachable!("Expected CREATE TABLE statement"),
        }
    }

    /// Reconstruct the SQL for the table.
    /// FIXME: this makes us incompatible with SQLite since sqlite stores the user-provided SQL as is in
    /// `sqlite_schema.sql`
    /// For example, if a user creates a table like: `CREATE TABLE t              (x)`, we store it as
    /// `CREATE TABLE t (x)`, whereas sqlite stores it with the original extra whitespace.
    pub fn to_sql(&self) -> String {
        let mut sql = format!("CREATE TABLE {} (", self.name);
        let needs_pk_inline = self.primary_key_columns.len() == 1;
        // Add columns
        for (i, column) in self.columns.iter().enumerate() {
            if i > 0 {
                sql.push_str(", ");
            }

            // we need to wrap the column name in square brackets if it contains special characters
            let column_name = column.name.as_ref().expect("column name is None");
            if identifier_contains_special_chars(column_name) {
                sql.push('[');
                sql.push_str(column_name);
                sql.push(']');
            } else {
                sql.push_str(column_name);
            }

            if !column.ty_str.is_empty() {
                sql.push(' ');
                sql.push_str(&column.ty_str);
            }
            if column.notnull() {
                sql.push_str(" NOT NULL");
            }

            if column.unique() {
                sql.push_str(" UNIQUE");
            }
            if needs_pk_inline && column.primary_key() {
                sql.push_str(" PRIMARY KEY");
            }

            if let Some(default) = &column.default {
                sql.push_str(" DEFAULT ");
                sql.push_str(&default.to_string());
            }
        }

        let has_table_pk = !self.primary_key_columns.is_empty();
        // Add table-level PRIMARY KEY constraint if exists
        if !needs_pk_inline && has_table_pk {
            sql.push_str(", PRIMARY KEY (");
            for (i, col) in self.primary_key_columns.iter().enumerate() {
                if i > 0 {
                    sql.push_str(", ");
                }
                sql.push_str(&col.0);
            }
            sql.push(')');
        }

        for fk in &self.foreign_keys {
            sql.push_str(", FOREIGN KEY (");
            for (i, col) in fk.child_columns.iter().enumerate() {
                if i > 0 {
                    sql.push_str(", ");
                }
                sql.push_str(col);
            }
            sql.push_str(") REFERENCES ");
            sql.push_str(&fk.parent_table);
            sql.push('(');
            for (i, col) in fk.parent_columns.iter().enumerate() {
                if i > 0 {
                    sql.push_str(", ");
                }
                sql.push_str(col);
            }
            sql.push(')');

            // Add ON DELETE/UPDATE actions, NoAction is default so just make empty in that case
            if fk.on_delete != RefAct::NoAction {
                sql.push_str(" ON DELETE ");
                sql.push_str(match fk.on_delete {
                    RefAct::SetNull => "SET NULL",
                    RefAct::SetDefault => "SET DEFAULT",
                    RefAct::Cascade => "CASCADE",
                    RefAct::Restrict => "RESTRICT",
                    _ => "",
                });
            }
            if fk.on_update != RefAct::NoAction {
                sql.push_str(" ON UPDATE ");
                sql.push_str(match fk.on_update {
                    RefAct::SetNull => "SET NULL",
                    RefAct::SetDefault => "SET DEFAULT",
                    RefAct::Cascade => "CASCADE",
                    RefAct::Restrict => "RESTRICT",
                    _ => "",
                });
            }
            if fk.deferred {
                sql.push_str(" DEFERRABLE INITIALLY DEFERRED");
            }
        }
        sql.push(')');
        sql
    }

    pub fn column_collations(&self) -> Vec<CollationSeq> {
        self.columns
            .iter()
            .map(|column| column.collation())
            .collect()
    }
}

fn identifier_contains_special_chars(name: &str) -> bool {
    name.chars().any(|c| !c.is_ascii_alphanumeric() && c != '_')
}

#[derive(Debug, Default, Clone, Copy)]
pub struct PseudoCursorType {
    pub column_count: usize,
}

impl PseudoCursorType {
    pub fn new() -> Self {
        Self { column_count: 0 }
    }

    pub fn new_with_columns(columns: impl AsRef<[Column]>) -> Self {
        Self {
            column_count: columns.as_ref().len(),
        }
    }
}

/// A derived table from a FROM clause subquery.
#[derive(Debug, Clone)]
pub struct FromClauseSubquery {
    /// The name of the derived table; uses the alias if available.
    pub name: String,
    /// The query plan for the derived table.
    pub plan: Box<SelectPlan>,
    /// The columns of the derived table.
    pub columns: Vec<Column>,
    /// The start register for the result columns of the derived table;
    /// must be set before data is read from it.
    pub result_columns_start_reg: Option<usize>,
}

pub fn create_table(tbl_name: &str, body: &CreateTableBody, root_page: i64) -> Result<BTreeTable> {
    let table_name = normalize_ident(tbl_name);
    trace!("Creating table {}", table_name);
    let mut has_rowid = true;
    let mut has_autoincrement = false;
    let mut primary_key_columns = vec![];
    let mut foreign_keys = vec![];
    let mut cols = vec![];
    let is_strict: bool;
    let mut unique_sets: Vec<UniqueSet> = vec![];
    match body {
        CreateTableBody::ColumnsAndConstraints {
            columns,
            constraints,
            options,
        } => {
            is_strict = options.contains(TableOptions::STRICT);
            for c in constraints {
                if let ast::TableConstraint::PrimaryKey {
                    columns,
                    auto_increment,
                    ..
                } = &c.constraint
                {
                    if !primary_key_columns.is_empty() {
                        crate::bail_parse_error!(
                            "table \"{}\" has more than one primary key",
                            tbl_name
                        );
                    }
                    if *auto_increment {
                        has_autoincrement = true;
                    }

                    for column in columns {
                        let col_name = match column.expr.as_ref() {
                            Expr::Id(id) => normalize_ident(id.as_str()),
                            Expr::Literal(Literal::String(value)) => {
                                value.trim_matches('\'').to_owned()
                            }
                            expr => {
                                bail_parse_error!("unsupported primary key expression: {}", expr)
                            }
                        };
                        primary_key_columns
                            .push((col_name, column.order.unwrap_or(SortOrder::Asc)));
                    }
                    unique_sets.push(UniqueSet {
                        columns: primary_key_columns.clone(),
                        is_primary_key: true,
                    });
                } else if let ast::TableConstraint::Unique {
                    columns,
                    conflict_clause,
                } = &c.constraint
                {
                    if conflict_clause.is_some() {
                        unimplemented!("ON CONFLICT not implemented");
                    }
                    let mut unique_columns = Vec::with_capacity(columns.len());
                    for column in columns {
                        match column.expr.as_ref() {
                            Expr::Id(id) => unique_columns.push((
                                id.as_str().to_string(),
                                column.order.unwrap_or(SortOrder::Asc),
                            )),
                            Expr::Literal(Literal::String(value)) => unique_columns.push((
                                value.trim_matches('\'').to_owned(),
                                column.order.unwrap_or(SortOrder::Asc),
                            )),
                            expr => {
                                bail_parse_error!("unsupported unique key expression: {}", expr)
                            }
                        }
                    }
                    let unique_set = UniqueSet {
                        columns: unique_columns,
                        is_primary_key: false,
                    };
                    unique_sets.push(unique_set);
                } else if let ast::TableConstraint::ForeignKey {
                    columns,
                    clause,
                    defer_clause,
                } = &c.constraint
                {
                    let child_columns: Vec<String> = columns
                        .iter()
                        .map(|ic| normalize_ident(ic.col_name.as_str()))
                        .collect();
                    // derive parent columns: explicit or default to parent PK
                    let parent_table = normalize_ident(clause.tbl_name.as_str());
                    let parent_columns: Vec<String> = clause
                        .columns
                        .iter()
                        .map(|ic| normalize_ident(ic.col_name.as_str()))
                        .collect();

                    // Only check arity if parent columns were explicitly listed
                    if !parent_columns.is_empty() && child_columns.len() != parent_columns.len() {
                        crate::bail_parse_error!(
                            "foreign key on \"{}\" has {} child column(s) but {} parent column(s)",
                            tbl_name,
                            child_columns.len(),
                            parent_columns.len()
                        );
                    }
                    // deferrable semantics
                    let deferred = match defer_clause {
                        Some(d) => {
                            d.deferrable
                                && matches!(
                                    d.init_deferred,
                                    Some(InitDeferredPred::InitiallyDeferred)
                                )
                        }
                        None => false, // NOT DEFERRABLE INITIALLY IMMEDIATE by default
                    };
                    let fk = ForeignKey {
                        parent_table,
                        parent_columns,
                        child_columns,
                        on_delete: clause
                            .args
                            .iter()
                            .find_map(|a| {
                                if let ast::RefArg::OnDelete(x) = a {
                                    Some(*x)
                                } else {
                                    None
                                }
                            })
                            .unwrap_or(RefAct::NoAction),
                        on_update: clause
                            .args
                            .iter()
                            .find_map(|a| {
                                if let ast::RefArg::OnUpdate(x) = a {
                                    Some(*x)
                                } else {
                                    None
                                }
                            })
                            .unwrap_or(RefAct::NoAction),
                        deferred,
                    };
                    foreign_keys.push(Arc::new(fk));
                }
            }

            // Due to a bug in SQLite, this check is needed to maintain backwards compatibility with rowid alias
            // SQLite docs: https://sqlite.org/lang_createtable.html#rowids_and_the_integer_primary_key
            // Issue: https://github.com/tursodatabase/turso/issues/3665
            let mut primary_key_desc_columns_constraint = false;

            for ast::ColumnDefinition {
                col_name,
                col_type,
                constraints,
            } in columns
            {
                let name = col_name.as_str().to_string();
                // Regular sqlite tables have an integer rowid that uniquely identifies a row.
                // Even if you create a table with a column e.g. 'id INT PRIMARY KEY', there will still
                // be a separate hidden rowid, and the 'id' column will have a separate index built for it.
                //
                // However:
                // A column defined as exactly INTEGER PRIMARY KEY is a rowid alias, meaning that the rowid
                // and the value of this column are the same.
                // https://www.sqlite.org/lang_createtable.html#rowids_and_the_integer_primary_key
                let ty_str = col_type
                    .as_ref()
                    .cloned()
                    .map(|ast::Type { name, .. }| name.clone())
                    .unwrap_or_default();

                let mut typename_exactly_integer = false;
                let ty = match col_type {
                    Some(data_type) => {
                        let (ty, ei) = type_from_name(&data_type.name);
                        typename_exactly_integer = ei;
                        ty
                    }
                    None => Type::Null,
                };

                let mut default = None;
                let mut primary_key = false;
                let mut notnull = false;
                let mut order = SortOrder::Asc;
                let mut unique = false;
                let mut collation = None;
                for c_def in constraints {
                    match &c_def.constraint {
                        ast::ColumnConstraint::Check { .. } => {
                            crate::bail_parse_error!("CHECK constraints are not yet supported");
                        }
                        ast::ColumnConstraint::Generated { .. } => {
                            // todo(sivukhin): table_xinfo must be updated when generated columns will be supported in order to properly emit "hidden" column value
                            crate::bail_parse_error!("GENERATED columns are not yet supported");
                        }
                        ast::ColumnConstraint::PrimaryKey {
                            order: o,
                            auto_increment,
                            conflict_clause,
                            ..
                        } => {
                            if conflict_clause.is_some() {
                                crate::bail_parse_error!(
                                    "ON CONFLICT not implemented for column definition"
                                );
                            }
                            if !primary_key_columns.is_empty() {
                                crate::bail_parse_error!(
                                    "table \"{}\" has more than one primary key",
                                    tbl_name
                                );
                            }
                            primary_key = true;
                            if *auto_increment {
                                has_autoincrement = true;
                            }
                            if let Some(o) = o {
                                order = *o;
                            }
                            unique_sets.push(UniqueSet {
                                columns: vec![(name.clone(), order)],
                                is_primary_key: true,
                            });
                        }
                        ast::ColumnConstraint::NotNull {
                            nullable,
                            conflict_clause,
                            ..
                        } => {
                            if conflict_clause.is_some() {
                                crate::bail_parse_error!(
                                    "ON CONFLICT not implemented for column definition"
                                );
                            }
                            notnull = !nullable;
                        }
                        ast::ColumnConstraint::Default(ref expr) => {
                            default = Some(
                                translate_ident_to_string_literal(expr).unwrap_or(expr.clone()),
                            );
                        }
                        // TODO: for now we don't check Resolve type of unique
                        ast::ColumnConstraint::Unique(conflict) => {
                            if conflict.is_some() {
                                crate::bail_parse_error!(
                                    "ON CONFLICT not implemented for column definition"
                                );
                            }
                            unique = true;
                            unique_sets.push(UniqueSet {
                                columns: vec![(name.clone(), order)],
                                is_primary_key: false,
                            });
                        }
                        ast::ColumnConstraint::Collate { ref collation_name } => {
                            collation = Some(CollationSeq::new(collation_name.as_str())?);
                        }
                        ast::ColumnConstraint::ForeignKey {
                            clause,
                            defer_clause,
                        } => {
                            let fk = ForeignKey {
                                parent_table: normalize_ident(clause.tbl_name.as_str()),
                                parent_columns: clause
                                    .columns
                                    .iter()
                                    .map(|c| normalize_ident(c.col_name.as_str()))
                                    .collect(),
                                on_delete: clause
                                    .args
                                    .iter()
                                    .find_map(|arg| {
                                        if let ast::RefArg::OnDelete(act) = arg {
                                            Some(*act)
                                        } else {
                                            None
                                        }
                                    })
                                    .unwrap_or(RefAct::NoAction),
                                on_update: clause
                                    .args
                                    .iter()
                                    .find_map(|arg| {
                                        if let ast::RefArg::OnUpdate(act) = arg {
                                            Some(*act)
                                        } else {
                                            None
                                        }
                                    })
                                    .unwrap_or(RefAct::NoAction),
                                child_columns: vec![name.clone()],
                                deferred: match defer_clause {
                                    Some(d) => {
                                        d.deferrable
                                            && matches!(
                                                d.init_deferred,
                                                Some(InitDeferredPred::InitiallyDeferred)
                                            )
                                    }
                                    None => false,
                                },
                            };
                            foreign_keys.push(Arc::new(fk));
                        }
                    }
                }

                if primary_key {
                    primary_key_columns.push((name.clone(), order));
                    if order == SortOrder::Desc {
                        primary_key_desc_columns_constraint = true;
                    }
                } else if primary_key_columns
                    .iter()
                    .any(|(col_name, _)| col_name == &name)
                {
                    primary_key = true;
                }

                cols.push(Column::new(
                    Some(normalize_ident(&name)),
                    ty_str,
                    default,
                    ty,
                    collation,
                    ColDef {
                        primary_key,
                        rowid_alias: typename_exactly_integer
                            && primary_key
                            && !primary_key_desc_columns_constraint,
                        notnull,
                        unique,
                        hidden: false,
                    },
                ));
            }
            if options.contains(TableOptions::WITHOUT_ROWID) {
                has_rowid = false;
            }
        }
        CreateTableBody::AsSelect(_) => todo!(),
    };

    // flip is_rowid_alias back to false if the table has multiple primary key columns
    // or if the table has no rowid
    if !has_rowid || primary_key_columns.len() > 1 {
        for col in cols.iter_mut() {
            col.set_rowid_alias(false);
        }
    }

    if has_autoincrement {
        // only allow integers
        if primary_key_columns.len() != 1 {
            crate::bail_parse_error!("AUTOINCREMENT is only allowed on an INTEGER PRIMARY KEY");
        }
        let pk_col_name = &primary_key_columns[0].0;
        let pk_col = cols.iter().find(|c| c.name.as_deref() == Some(pk_col_name));

        if let Some(col) = pk_col {
            if col.ty() != Type::Integer {
                crate::bail_parse_error!("AUTOINCREMENT is only allowed on an INTEGER PRIMARY KEY");
            }
        }
    }

    for col in cols.iter() {
        if col.is_rowid_alias() {
            // Unique sets are used for creating automatic indexes. An index is not created for a rowid alias PRIMARY KEY.
            // However, an index IS created for a rowid alias UNIQUE, e.g. CREATE TABLE t(x INTEGER PRIMARY KEY, UNIQUE(x))
            let unique_set_w_only_rowid_alias = unique_sets.iter().position(|us| {
                us.is_primary_key
                    && us.columns.len() == 1
                    && &us.columns.first().unwrap().0 == col.name.as_ref().unwrap()
            });
            if let Some(u) = unique_set_w_only_rowid_alias {
                unique_sets.remove(u);
            }
        }
    }

    Ok(BTreeTable {
        root_page,
        name: table_name,
        has_rowid,
        primary_key_columns,
        has_autoincrement,
        columns: cols,
        is_strict,
        foreign_keys,
        unique_sets: {
            // If there are any unique sets that have identical column names in the same order (even if they are PRIMARY KEY and UNIQUE and have different sort orders), remove the duplicates.
            // Examples:
            // PRIMARY KEY (a, b) and UNIQUE (a desc, b) are the same
            // PRIMARY KEY (a, b) and UNIQUE (b, a) are not the same
            // Using a n^2 monkey algorithm here because n is small, CPUs are fast, life is short, and most importantly:
            // we want to preserve the order of the sets -- automatic index names in sqlite_schema must be in definition order.
            let mut i = 0;
            while i < unique_sets.len() {
                let mut j = i + 1;
                while j < unique_sets.len() {
                    let lengths_equal =
                        unique_sets[i].columns.len() == unique_sets[j].columns.len();
                    if lengths_equal
                        && unique_sets[i]
                            .columns
                            .iter()
                            .zip(unique_sets[j].columns.iter())
                            .all(|((a_name, _), (b_name, _))| a_name == b_name)
                    {
                        unique_sets.remove(j);
                    } else {
                        j += 1;
                    }
                }
                i += 1;
            }
            unique_sets
        },
    })
}

pub fn translate_ident_to_string_literal(expr: &Expr) -> Option<Box<Expr>> {
    match expr {
        Expr::Name(name) => Some(Box::new(Expr::Literal(Literal::String(name.as_literal())))),
        _ => None,
    }
}

pub fn _build_pseudo_table(columns: &[ResultColumn]) -> PseudoCursorType {
    let table = PseudoCursorType::new();
    for column in columns {
        match column {
            ResultColumn::Expr(expr, _as_name) => {
                todo!("unsupported expression {:?}", expr);
            }
            ResultColumn::Star => {
                todo!();
            }
            ResultColumn::TableStar(_) => {
                todo!();
            }
        }
    }
    table
}

#[derive(Debug, Clone)]
pub struct ForeignKey {
    /// Columns in this table (child side)
    pub child_columns: Vec<String>,
    /// Referenced (parent) table
    pub parent_table: String,
    /// Parent-side referenced columns
    pub parent_columns: Vec<String>,
    pub on_delete: RefAct,
    pub on_update: RefAct,
    /// DEFERRABLE INITIALLY DEFERRED
    pub deferred: bool,
}
impl ForeignKey {
    fn validate(&self) -> Result<()> {
        // TODO: remove this when actions are implemented
        if !(matches!(self.on_update, RefAct::NoAction)
            && matches!(self.on_delete, RefAct::NoAction))
        {
            crate::bail_parse_error!(
                "foreign key actions other than NO ACTION are not implemented"
            );
        }
        if self
            .parent_columns
            .iter()
            .any(|c| ROWID_STRS.iter().any(|&r| r.eq_ignore_ascii_case(c)))
        {
            return Err(crate::LimboError::Constraint(format!(
                "foreign key mismatch referencing \"{}\"",
                self.parent_table
            )));
        }
        Ok(())
    }
}

/// A single resolved foreign key where `parent_table == target`.
#[derive(Clone, Debug)]
pub struct ResolvedFkRef {
    /// Child table that owns the FK.
    pub child_table: Arc<BTreeTable>,
    /// The FK as declared on the child table.
    pub fk: Arc<ForeignKey>,

    /// Resolved, normalized column names.
    pub child_cols: Vec<String>,
    /// Column positions in the child/parent tables (pos_in_table)
    pub child_pos: Vec<usize>,
    pub parent_pos: Vec<usize>,

    /// If the parent key is rowid or a rowid-alias (single-column only)
    pub parent_uses_rowid: bool,
    /// For non-rowid parents: the UNIQUE index that enforces the parent key.
    /// (None when `parent_uses_rowid == true`.)
    pub parent_unique_index: Option<Arc<Index>>,
}

impl ResolvedFkRef {
    /// Returns if any referenced parent column can change when these column positions are updated.
    pub fn parent_key_may_change(
        &self,
        updated_parent_positions: &HashSet<usize>,
        parent_tbl: &BTreeTable,
    ) -> bool {
        if self.parent_uses_rowid {
            // parent rowid changes if the parent's rowid or alias is updated
            if let Some((idx, _)) = parent_tbl
                .columns
                .iter()
                .enumerate()
                .find(|(_, c)| c.is_rowid_alias())
            {
                return updated_parent_positions.contains(&idx);
            }
            // Without a rowid alias, a direct rowid update is represented separately with ROWID_SENTINEL
            return true;
        }
        self.parent_pos
            .iter()
            .any(|p| updated_parent_positions.contains(p))
    }

    /// Returns if any child column of this FK is in `updated_child_positions`
    pub fn child_key_changed(
        &self,
        updated_child_positions: &HashSet<usize>,
        child_tbl: &BTreeTable,
    ) -> bool {
        if self
            .child_pos
            .iter()
            .any(|p| updated_child_positions.contains(p))
        {
            return true;
        }
        // special case: if FK uses a rowid alias on child, and rowid changed
        if self.child_cols.len() == 1 {
            let (i, col) = child_tbl.get_column(&self.child_cols[0]).unwrap();
            if col.is_rowid_alias() && updated_child_positions.contains(&i) {
                return true;
            }
        }
        false
    }
}

#[derive(Debug, Clone)]
pub struct Column {
    pub name: Option<String>,
    pub ty_str: String,
    pub default: Option<Box<Expr>>,
    raw: u16,
}

#[derive(Default)]
pub struct ColDef {
    pub primary_key: bool,
    pub rowid_alias: bool,
    pub notnull: bool,
    pub unique: bool,
    pub hidden: bool,
}

// flags
const F_PRIMARY_KEY: u16 = 1;
const F_ROWID_ALIAS: u16 = 2;
const F_NOTNULL: u16 = 4;
const F_UNIQUE: u16 = 8;
const F_HIDDEN: u16 = 16;

// pack Type and Collation in the remaining bits
const TYPE_SHIFT: u16 = 5;
const TYPE_MASK: u16 = 0b111 << TYPE_SHIFT;
const COLL_SHIFT: u16 = TYPE_SHIFT + 3;
const COLL_MASK: u16 = 0b11 << COLL_SHIFT;

impl Column {
    pub fn affinity(&self) -> Affinity {
        Affinity::affinity(&self.ty_str)
    }
    pub fn new_default_text(
        name: Option<String>,
        ty_str: String,
        default: Option<Box<Expr>>,
    ) -> Self {
        Self::new(name, ty_str, default, Type::Text, None, ColDef::default())
    }
    pub fn new_default_integer(
        name: Option<String>,
        ty_str: String,
        default: Option<Box<Expr>>,
    ) -> Self {
        Self::new(
            name,
            ty_str,
            default,
            Type::Integer,
            None,
            ColDef::default(),
        )
    }
    #[inline]
    pub const fn new(
        name: Option<String>,
        ty_str: String,
        default: Option<Box<Expr>>,
        ty: Type,
        col: Option<CollationSeq>,
        coldef: ColDef,
    ) -> Self {
        let mut raw = 0u16;
        raw |= (ty as u16) << TYPE_SHIFT;
        if let Some(c) = col {
            raw |= (c as u16) << COLL_SHIFT;
        }
        if coldef.primary_key {
            raw |= F_PRIMARY_KEY
        }
        if coldef.rowid_alias {
            raw |= F_ROWID_ALIAS
        }
        if coldef.notnull {
            raw |= F_NOTNULL
        }
        if coldef.unique {
            raw |= F_UNIQUE
        }
        if coldef.hidden {
            raw |= F_HIDDEN
        }
        Self {
            name,
            ty_str,
            default,
            raw,
        }
    }
    #[inline]
    pub const fn ty(&self) -> Type {
        let v = ((self.raw & TYPE_MASK) >> TYPE_SHIFT) as u8;
        Type::from_bits(v)
    }

    #[inline]
    pub const fn set_ty(&mut self, ty: Type) {
        self.raw = (self.raw & !TYPE_MASK) | (((ty as u16) << TYPE_SHIFT) & TYPE_MASK);
    }

    #[inline]
    pub const fn collation_opt(&self) -> Option<CollationSeq> {
        if self.has_explicit_collation() {
            Some(self.collation())
        } else {
            None
        }
    }

    #[inline]
    pub const fn collation(&self) -> CollationSeq {
        let v = ((self.raw & COLL_MASK) >> COLL_SHIFT) as u8;
        CollationSeq::from_bits(v)
    }

    #[inline]
    pub const fn has_explicit_collation(&self) -> bool {
        let v = ((self.raw & COLL_MASK) >> COLL_SHIFT) as u8;
        v != CollationSeq::Unset as u8
    }

    #[inline]
    pub const fn set_collation(&mut self, c: Option<CollationSeq>) {
        if let Some(c) = c {
            self.raw = (self.raw & !COLL_MASK) | (((c as u16) << COLL_SHIFT) & COLL_MASK);
        }
    }

    #[inline]
    pub fn primary_key(&self) -> bool {
        self.raw & F_PRIMARY_KEY != 0
    }
    #[inline]
    pub const fn is_rowid_alias(&self) -> bool {
        self.raw & F_ROWID_ALIAS != 0
    }
    #[inline]
    pub const fn notnull(&self) -> bool {
        self.raw & F_NOTNULL != 0
    }
    #[inline]
    pub const fn unique(&self) -> bool {
        self.raw & F_UNIQUE != 0
    }
    #[inline]
    pub const fn hidden(&self) -> bool {
        self.raw & F_HIDDEN != 0
    }

    #[inline]
    pub const fn set_primary_key(&mut self, v: bool) {
        self.set_flag(F_PRIMARY_KEY, v);
    }
    #[inline]
    pub const fn set_rowid_alias(&mut self, v: bool) {
        self.set_flag(F_ROWID_ALIAS, v);
    }
    #[inline]
    pub const fn set_notnull(&mut self, v: bool) {
        self.set_flag(F_NOTNULL, v);
    }
    #[inline]
    pub const fn set_unique(&mut self, v: bool) {
        self.set_flag(F_UNIQUE, v);
    }
    #[inline]
    pub const fn set_hidden(&mut self, v: bool) {
        self.set_flag(F_HIDDEN, v);
    }

    #[inline]
    const fn set_flag(&mut self, mask: u16, val: bool) {
        if val {
            self.raw |= mask
        } else {
            self.raw &= !mask
        }
    }
}

// TODO: This might replace some of util::columns_from_create_table_body
impl From<&ColumnDefinition> for Column {
    fn from(value: &ColumnDefinition) -> Self {
        let name = value.col_name.as_str();

        let mut default = None;
        let mut notnull = false;
        let mut primary_key = false;
        let mut unique = false;
        let mut collation = None;

        for ast::NamedColumnConstraint { constraint, .. } in &value.constraints {
            match constraint {
                ast::ColumnConstraint::PrimaryKey { .. } => primary_key = true,
                ast::ColumnConstraint::NotNull { .. } => notnull = true,
                ast::ColumnConstraint::Unique(..) => unique = true,
                ast::ColumnConstraint::Default(expr) => {
                    default
                        .replace(translate_ident_to_string_literal(expr).unwrap_or(expr.clone()));
                }
                ast::ColumnConstraint::Collate { collation_name } => {
                    collation.replace(
                        CollationSeq::new(collation_name.as_str())
                            .expect("collation should have been set correctly in create table"),
                    );
                }
                _ => {}
            };
        }

        let ty = match value.col_type {
            Some(ref data_type) => type_from_name(&data_type.name).0,
            None => Type::Null,
        };

        let ty_str = value
            .col_type
            .as_ref()
            .map(|t| t.name.to_string())
            .unwrap_or_default();

        let hidden = ty_str.contains("HIDDEN");

        Column::new(
            Some(normalize_ident(name)),
            ty_str,
            default,
            ty,
            collation,
            ColDef {
                primary_key,
                rowid_alias: primary_key && matches!(ty, Type::Integer),
                notnull,
                unique,
                hidden,
            },
        )
    }
}

#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Type {
    Null = 0,
    Text = 1,
    Numeric = 2,
    Integer = 3,
    Real = 4,
    Blob = 5,
}

impl Type {
    #[inline]
    const fn from_bits(bits: u8) -> Self {
        match bits {
            0 => Type::Null,
            1 => Type::Text,
            2 => Type::Numeric,
            3 => Type::Integer,
            4 => Type::Real,
            5 => Type::Blob,
            _ => Type::Null,
        }
    }
}

impl fmt::Display for Type {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let s = match self {
            Self::Null => "",
            Self::Text => "TEXT",
            Self::Numeric => "NUMERIC",
            Self::Integer => "INTEGER",
            Self::Real => "REAL",
            Self::Blob => "BLOB",
        };
        write!(f, "{s}")
    }
}

pub fn sqlite_schema_table() -> BTreeTable {
    BTreeTable {
        root_page: 1,
        name: "sqlite_schema".to_string(),
        has_rowid: true,
        is_strict: false,
        has_autoincrement: false,
        primary_key_columns: vec![],
        columns: vec![
            Column::new_default_text(Some("type".to_string()), "TEXT".to_string(), None),
            Column::new_default_text(Some("name".to_string()), "TEXT".to_string(), None),
            Column::new_default_text(Some("tbl_name".to_string()), "TEXT".to_string(), None),
            Column::new_default_integer(Some("rootpage".to_string()), "INT".to_string(), None),
            Column::new_default_text(Some("sql".to_string()), "TEXT".to_string(), None),
        ],
        foreign_keys: vec![],
        unique_sets: vec![],
    }
}

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct Index {
    pub name: String,
    pub table_name: String,
    pub root_page: i64,
    pub columns: Vec<IndexColumn>,
    pub unique: bool,
    pub ephemeral: bool,
    /// Does the index have a rowid as the last column?
    /// This is the case for btree indexes (persistent or ephemeral) that
    /// have been created based on a table with a rowid.
    /// For example, WITHOUT ROWID tables (not supported in Limbo yet),
    /// and  SELECT DISTINCT ephemeral indexes will not have a rowid.
    pub has_rowid: bool,
    pub where_clause: Option<Box<Expr>>,
    pub index_method: Option<Arc<dyn IndexMethodAttachment>>,
}

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct IndexColumn {
    pub name: String,
    pub order: SortOrder,
    /// the position of the column in the source table.
    /// for example:
    /// CREATE TABLE t (a,b,c)
    /// CREATE INDEX idx ON t(b)
    /// b.pos_in_table == 1
    pub pos_in_table: usize,
    pub collation: Option<CollationSeq>,
    pub default: Option<Box<Expr>>,
}

impl Index {
    pub fn from_sql(
        syms: &SymbolTable,
        sql: &str,
        root_page: i64,
        table: &BTreeTable,
    ) -> Result<Index> {
        let mut parser = Parser::new(sql.as_bytes());
        let cmd = parser.next_cmd()?;
        match cmd {
            Some(Cmd::Stmt(Stmt::CreateIndex {
                idx_name,
                tbl_name,
                columns,
                unique,
                where_clause,
                using,
                with_clause,
                ..
            })) => {
                let index_name = normalize_ident(idx_name.name.as_str());
                let index_columns = resolve_sorted_columns(table, &columns)?;
                if let Some(using) = using {
                    if where_clause.is_some() {
                        bail_parse_error!("custom index module do not support partial indices");
                    }
                    if unique {
                        bail_parse_error!("custom index module do not support UNIQUE indices");
                    }
                    let parameters = resolve_index_method_parameters(with_clause)?;
                    let Some(module) = syms.index_methods.get(using.as_str()) else {
                        bail_parse_error!("unknown module name: '{}'", using);
                    };
                    let configuration = IndexMethodConfiguration {
                        table_name: table.name.clone(),
                        index_name: index_name.clone(),
                        columns: index_columns.clone(),
                        parameters,
                    };
                    let descriptor = module.attach(&configuration)?;
                    Ok(Index {
                        name: index_name,
                        table_name: normalize_ident(tbl_name.as_str()),
                        root_page,
                        columns: index_columns,
                        unique: false,
                        ephemeral: false,
                        has_rowid: table.has_rowid,
                        where_clause: None,
                        index_method: Some(descriptor),
                    })
                } else {
                    Ok(Index {
                        name: index_name,
                        table_name: normalize_ident(tbl_name.as_str()),
                        root_page,
                        columns: index_columns,
                        unique,
                        ephemeral: false,
                        has_rowid: table.has_rowid,
                        where_clause,
                        index_method: None,
                    })
                }
            }
            _ => todo!("Expected create index statement"),
        }
    }

    /// check if this is special backing_btree index created and managed by custom index_method
    pub fn is_backing_btree_index(&self) -> bool {
        self.index_method
            .as_ref()
            .is_some_and(|x| x.definition().backing_btree)
    }

    pub fn automatic_from_primary_key(
        table: &BTreeTable,
        auto_index: (String, i64), // name, root_page
        column_count: usize,
    ) -> Result<Index> {
        let has_primary_key_index =
            table.get_rowid_alias_column().is_none() && !table.primary_key_columns.is_empty();
        assert!(has_primary_key_index);
        let (index_name, root_page) = auto_index;

        let mut primary_keys = Vec::with_capacity(column_count);
        for (col_name, order) in table.primary_key_columns.iter() {
            let Some((pos_in_table, _)) = table.get_column(col_name) else {
                return Err(crate::LimboError::ParseError(format!(
                    "Column {} not found in table {}",
                    col_name, table.name
                )));
            };
            let (_, column) = table.get_column(col_name).unwrap();
            primary_keys.push(IndexColumn {
                name: normalize_ident(col_name),
                order: *order,
                pos_in_table,
                collation: column.collation_opt(),
                default: column.default.clone(),
            });
        }

        assert!(primary_keys.len() == column_count);

        Ok(Index {
            name: normalize_ident(index_name.as_str()),
            table_name: table.name.clone(),
            root_page,
            columns: primary_keys,
            unique: true,
            ephemeral: false,
            has_rowid: table.has_rowid,
            where_clause: None,
            index_method: None,
        })
    }

    pub fn automatic_from_unique(
        table: &BTreeTable,
        auto_index: (String, i64), // name, root_page
        column_indices_and_sort_orders: Vec<(usize, SortOrder)>,
    ) -> Result<Index> {
        let (index_name, root_page) = auto_index;

        let unique_cols = table
            .columns
            .iter()
            .enumerate()
            .filter_map(|(pos_in_table, col)| {
                let (pos_in_table, sort_order) = column_indices_and_sort_orders
                    .iter()
                    .find(|(pos, _)| *pos == pos_in_table)?;
                Some(IndexColumn {
                    name: normalize_ident(col.name.as_ref().unwrap()),
                    order: *sort_order,
                    pos_in_table: *pos_in_table,
                    collation: col.collation_opt(),
                    default: col.default.clone(),
                })
            })
            .collect::<Vec<_>>();

        Ok(Index {
            name: normalize_ident(index_name.as_str()),
            table_name: table.name.clone(),
            root_page,
            columns: unique_cols,
            unique: true,
            ephemeral: false,
            has_rowid: table.has_rowid,
            where_clause: None,
            index_method: None,
        })
    }

    /// Given a column position in the table, return the position in the index.
    /// Returns None if the column is not found in the index.
    /// For example, given:
    /// CREATE TABLE t (a, b, c)
    /// CREATE INDEX idx ON t(b)
    /// then column_table_pos_to_index_pos(1) returns Some(0)
    pub fn column_table_pos_to_index_pos(&self, table_pos: usize) -> Option<usize> {
        self.columns
            .iter()
            .position(|c| c.pos_in_table == table_pos)
    }

    /// Walk the where_clause Expr of a partial index and validate that it doesn't reference any other
    /// tables or use any disallowed constructs.
    pub fn validate_where_expr(&self, table: &Table) -> bool {
        let Some(where_clause) = &self.where_clause else {
            return true;
        };

        let tbl_norm = normalize_ident(self.table_name.as_str());
        let has_col = |name: &str| {
            let n = normalize_ident(name);
            table
                .columns()
                .iter()
                .any(|c| c.name.as_ref().is_some_and(|cn| normalize_ident(cn) == n))
        };
        let is_tbl = |ns: &str| normalize_ident(ns).eq_ignore_ascii_case(&tbl_norm);
        let is_deterministic_fn = |name: &str, argc: usize| {
            let n = normalize_ident(name);
            Func::resolve_function(&n, argc).is_ok_and(|f| f.is_deterministic())
        };

        let mut ok = true;
        let _ = walk_expr(where_clause.as_ref(), &mut |e: &Expr| -> crate::Result<
            WalkControl,
        > {
            if !ok {
                return Ok(WalkControl::SkipChildren);
            }
            match e {
                Expr::Literal(_) | Expr::RowId { .. } => {}
                // Unqualified identifier: must be a column of the target table or ROWID
                Expr::Id(n) => {
                    let n = n.as_str();
                    if !ROWID_STRS.iter().any(|s| s.eq_ignore_ascii_case(n)) && !has_col(n) {
                        ok = false;
                    }
                }
                // Qualified: qualifier must match this index's table; column must exist
                Expr::Qualified(ns, col) | Expr::DoublyQualified(_, ns, col) => {
                    if !is_tbl(ns.as_str()) || !has_col(col.as_str()) {
                        ok = false;
                    }
                }
                Expr::FunctionCall {
                    name, filter_over, ..
                }
                | Expr::FunctionCallStar {
                    name, filter_over, ..
                } => {
                    // reject windowed
                    if filter_over.over_clause.is_some() {
                        ok = false;
                    } else {
                        let argc = match e {
                            Expr::FunctionCall { args, .. } => args.len(),
                            Expr::FunctionCallStar { .. } => 0,
                            _ => unreachable!(),
                        };
                        if !is_deterministic_fn(name.as_str(), argc) {
                            ok = false;
                        }
                    }
                }
                // Explicitly disallowed constructs
                Expr::Exists(_)
                | Expr::InSelect { .. }
                | Expr::Subquery(_)
                | Expr::Raise { .. }
                | Expr::Variable(_) => {
                    ok = false;
                }
                _ => {}
            }
            Ok(if ok {
                WalkControl::Continue
            } else {
                WalkControl::SkipChildren
            })
        });
        ok
    }

    pub fn bind_where_expr(
        &self,
        table_refs: Option<&mut TableReferences>,
        connection: &Arc<Connection>,
    ) -> Option<ast::Expr> {
        let Some(where_clause) = &self.where_clause else {
            return None;
        };
        let mut expr = where_clause.clone();
        bind_and_rewrite_expr(
            &mut expr,
            table_refs,
            None,
            connection,
            BindingBehavior::ResultColumnsNotAllowed,
        )
        .ok()?;
        Some(*expr)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    pub fn test_has_rowid_true() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        assert!(table.has_rowid, "has_rowid should be set to true");
        Ok(())
    }

    #[test]
    pub fn test_has_rowid_false() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT) WITHOUT ROWID;"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        assert!(!table.has_rowid, "has_rowid should be set to false");
        Ok(())
    }

    #[test]
    pub fn test_column_is_rowid_alias_single_text() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a TEXT PRIMARY KEY, b TEXT);"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(
            !column.is_rowid_alias(),
            "column 'a´ has type different than INTEGER so can't be a rowid alias"
        );
        Ok(())
    }

    #[test]
    pub fn test_column_is_rowid_alias_single_integer() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(
            column.is_rowid_alias(),
            "column 'a´ should be a rowid alias"
        );
        Ok(())
    }

    #[test]
    pub fn test_column_is_rowid_alias_single_integer_separate_primary_key_definition() -> Result<()>
    {
        let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a));"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(
            column.is_rowid_alias(),
            "column 'a´ should be a rowid alias"
        );
        Ok(())
    }

    #[test]
    pub fn test_column_is_rowid_alias_single_integer_separate_primary_key_definition_without_rowid(
    ) -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a)) WITHOUT ROWID;"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(
            !column.is_rowid_alias(),
            "column 'a´ shouldn't be a rowid alias because table has no rowid"
        );
        Ok(())
    }

    #[test]
    pub fn test_column_is_rowid_alias_single_integer_without_rowid() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT) WITHOUT ROWID;"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(
            !column.is_rowid_alias(),
            "column 'a´ shouldn't be a rowid alias because table has no rowid"
        );
        Ok(())
    }

    #[test]
    pub fn test_multiple_pk_forbidden() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT PRIMARY KEY);"#;
        let table = BTreeTable::from_sql(sql, 0);
        let error = table.unwrap_err();
        assert!(
            matches!(error, LimboError::ParseError(e) if e.contains("table \"t1\" has more than one primary key"))
        );
        Ok(())
    }

    #[test]
    pub fn test_column_is_rowid_alias_separate_composite_primary_key_definition() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a, b));"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(
            !column.is_rowid_alias(),
            "column 'a´ shouldn't be a rowid alias because table has composite primary key"
        );
        Ok(())
    }

    #[test]
    pub fn test_primary_key_inline_single() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT, c REAL);"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(column.primary_key(), "column 'a' should be a primary key");
        let column = table.get_column("b").unwrap().1;
        assert!(
            !column.primary_key(),
            "column 'b' shouldn't be a primary key"
        );
        let column = table.get_column("c").unwrap().1;
        assert!(
            !column.primary_key(),
            "column 'c' shouldn't be a primary key"
        );
        assert_eq!(
            vec![("a".to_string(), SortOrder::Asc)],
            table.primary_key_columns,
            "primary key column names should be ['a']"
        );
        Ok(())
    }

    #[test]
    pub fn test_primary_key_inline_multiple_forbidden() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT PRIMARY KEY, c REAL);"#;
        let table = BTreeTable::from_sql(sql, 0);
        let error = table.unwrap_err();
        assert!(
            matches!(error, LimboError::ParseError(e) if e.contains("table \"t1\" has more than one primary key"))
        );
        Ok(())
    }

    #[test]
    pub fn test_primary_key_separate_single() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY(a desc));"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(column.primary_key(), "column 'a' should be a primary key");
        let column = table.get_column("b").unwrap().1;
        assert!(
            !column.primary_key(),
            "column 'b' shouldn't be a primary key"
        );
        let column = table.get_column("c").unwrap().1;
        assert!(
            !column.primary_key(),
            "column 'c' shouldn't be a primary key"
        );
        assert_eq!(
            vec![("a".to_string(), SortOrder::Desc)],
            table.primary_key_columns,
            "primary key column names should be ['a']"
        );
        Ok(())
    }

    #[test]
    pub fn test_primary_key_separate_multiple() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY(a, b desc));"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(column.primary_key(), "column 'a' should be a primary key");
        let column = table.get_column("b").unwrap().1;
        assert!(column.primary_key(), "column 'b' shouldn be a primary key");
        let column = table.get_column("c").unwrap().1;
        assert!(
            !column.primary_key(),
            "column 'c' shouldn't be a primary key"
        );
        assert_eq!(
            vec![
                ("a".to_string(), SortOrder::Asc),
                ("b".to_string(), SortOrder::Desc)
            ],
            table.primary_key_columns,
            "primary key column names should be ['a', 'b']"
        );
        Ok(())
    }

    #[test]
    pub fn test_primary_key_separate_single_quoted() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY('a'));"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(column.primary_key(), "column 'a' should be a primary key");
        let column = table.get_column("b").unwrap().1;
        assert!(
            !column.primary_key(),
            "column 'b' shouldn't be a primary key"
        );
        let column = table.get_column("c").unwrap().1;
        assert!(
            !column.primary_key(),
            "column 'c' shouldn't be a primary key"
        );
        assert_eq!(
            vec![("a".to_string(), SortOrder::Asc)],
            table.primary_key_columns,
            "primary key column names should be ['a']"
        );
        Ok(())
    }
    #[test]
    pub fn test_primary_key_separate_single_doubly_quoted() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, c REAL, PRIMARY KEY("a"));"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(column.primary_key(), "column 'a' should be a primary key");
        let column = table.get_column("b").unwrap().1;
        assert!(
            !column.primary_key(),
            "column 'b' shouldn't be a primary key"
        );
        let column = table.get_column("c").unwrap().1;
        assert!(
            !column.primary_key(),
            "column 'c' shouldn't be a primary key"
        );
        assert_eq!(
            vec![("a".to_string(), SortOrder::Asc)],
            table.primary_key_columns,
            "primary key column names should be ['a']"
        );
        Ok(())
    }

    #[test]
    pub fn test_default_value() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER DEFAULT 23);"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        let default = column.default.clone().unwrap();
        assert_eq!(default.to_string(), "23");
        Ok(())
    }

    #[test]
    pub fn test_col_notnull() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER NOT NULL);"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(column.notnull());
        Ok(())
    }

    #[test]
    pub fn test_col_notnull_negative() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER);"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert!(!column.notnull());
        Ok(())
    }

    #[test]
    pub fn test_col_type_string_integer() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a InTeGeR);"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let column = table.get_column("a").unwrap().1;
        assert_eq!(column.ty_str, "InTeGeR");
        Ok(())
    }

    #[test]
    pub fn test_sqlite_schema() {
        let expected = r#"CREATE TABLE sqlite_schema (type TEXT, name TEXT, tbl_name TEXT, rootpage INT, sql TEXT)"#;
        let actual = sqlite_schema_table().to_sql();
        assert_eq!(expected, actual);
    }

    #[test]
    pub fn test_special_column_names() -> Result<()> {
        let tests = [
            ("foobar", "CREATE TABLE t (foobar TEXT)"),
            ("_table_name3", "CREATE TABLE t (_table_name3 TEXT)"),
            ("special name", "CREATE TABLE t ([special name] TEXT)"),
            ("foo&bar", "CREATE TABLE t ([foo&bar] TEXT)"),
            (" name", "CREATE TABLE t ([ name] TEXT)"),
        ];

        for (input_column_name, expected_sql) in tests {
            let sql = format!("CREATE TABLE t ([{input_column_name}] TEXT)");
            let actual = BTreeTable::from_sql(&sql, 0)?.to_sql();
            assert_eq!(expected_sql, actual);
        }

        Ok(())
    }

    #[test]
    #[should_panic]
    fn test_automatic_index_single_column() {
        // Without composite primary keys, we should not have an automatic index on a primary key that is a rowid alias
        let sql = r#"CREATE TABLE t1 (a INTEGER PRIMARY KEY, b TEXT);"#;
        let table = BTreeTable::from_sql(sql, 0).unwrap();
        let _index =
            Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)
                .unwrap();
    }

    #[test]
    fn test_automatic_index_composite_key() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT, PRIMARY KEY(a, b));"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let index =
            Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 2)?;

        assert_eq!(index.name, "sqlite_autoindex_t1_1");
        assert_eq!(index.table_name, "t1");
        assert_eq!(index.root_page, 2);
        assert!(index.unique);
        assert_eq!(index.columns.len(), 2);
        assert_eq!(index.columns[0].name, "a");
        assert_eq!(index.columns[1].name, "b");
        assert!(matches!(index.columns[0].order, SortOrder::Asc));
        assert!(matches!(index.columns[1].order, SortOrder::Asc));
        Ok(())
    }

    #[test]
    #[should_panic]
    fn test_automatic_index_no_primary_key() {
        let sql = r#"CREATE TABLE t1 (a INTEGER, b TEXT);"#;
        let table = BTreeTable::from_sql(sql, 0).unwrap();
        Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)
            .unwrap();
    }

    #[test]
    fn test_automatic_index_nonexistent_column() {
        // Create a table with a primary key column that doesn't exist in the table
        let table = BTreeTable {
            root_page: 0,
            name: "t1".to_string(),
            has_rowid: true,
            is_strict: false,
            has_autoincrement: false,
            primary_key_columns: vec![("nonexistent".to_string(), SortOrder::Asc)],
            columns: vec![Column::new_default_integer(
                Some("a".to_string()),
                "INT".to_string(),
                None,
            )],
            unique_sets: vec![],
            foreign_keys: vec![],
        };

        let result =
            Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1);
        assert!(result.is_err());
    }

    #[test]
    fn test_automatic_index_unique_column() -> Result<()> {
        let sql = r#"CREATE table t1 (x INTEGER, y INTEGER UNIQUE);"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let index = Index::automatic_from_unique(
            &table,
            ("sqlite_autoindex_t1_1".to_string(), 2),
            vec![(1, SortOrder::Asc)],
        )?;

        assert_eq!(index.name, "sqlite_autoindex_t1_1");
        assert_eq!(index.table_name, "t1");
        assert_eq!(index.root_page, 2);
        assert!(index.unique);
        assert_eq!(index.columns.len(), 1);
        assert_eq!(index.columns[0].name, "y");
        assert!(matches!(index.columns[0].order, SortOrder::Asc));
        Ok(())
    }

    #[test]
    fn test_automatic_index_pkey_unique_column() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (x PRIMARY KEY, y UNIQUE);"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let indices = [
            Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)?,
            Index::automatic_from_unique(
                &table,
                ("sqlite_autoindex_t1_2".to_string(), 3),
                vec![(1, SortOrder::Asc)],
            )?,
        ];

        assert_eq!(indices[0].name, "sqlite_autoindex_t1_1");
        assert_eq!(indices[0].table_name, "t1");
        assert_eq!(indices[0].root_page, 2);
        assert!(indices[0].unique);
        assert_eq!(indices[0].columns.len(), 1);
        assert_eq!(indices[0].columns[0].name, "x");
        assert!(matches!(indices[0].columns[0].order, SortOrder::Asc));

        assert_eq!(indices[1].name, "sqlite_autoindex_t1_2");
        assert_eq!(indices[1].table_name, "t1");
        assert_eq!(indices[1].root_page, 3);
        assert!(indices[1].unique);
        assert_eq!(indices[1].columns.len(), 1);
        assert_eq!(indices[1].columns[0].name, "y");
        assert!(matches!(indices[1].columns[0].order, SortOrder::Asc));

        Ok(())
    }

    #[test]
    fn test_automatic_index_pkey_many_unique_columns() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a PRIMARY KEY, b UNIQUE, c, d, UNIQUE(c, d));"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let auto_indices = [
            ("sqlite_autoindex_t1_1".to_string(), 2),
            ("sqlite_autoindex_t1_2".to_string(), 3),
            ("sqlite_autoindex_t1_3".to_string(), 4),
        ];
        let indices = vec![
            Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)?,
            Index::automatic_from_unique(
                &table,
                ("sqlite_autoindex_t1_2".to_string(), 3),
                vec![(1, SortOrder::Asc)],
            )?,
            Index::automatic_from_unique(
                &table,
                ("sqlite_autoindex_t1_3".to_string(), 4),
                vec![(2, SortOrder::Asc), (3, SortOrder::Asc)],
            )?,
        ];

        assert!(indices.len() == auto_indices.len());

        for (pos, index) in indices.iter().enumerate() {
            let (index_name, root_page) = &auto_indices[pos];
            assert_eq!(index.name, *index_name);
            assert_eq!(index.table_name, "t1");
            assert_eq!(index.root_page, *root_page);
            assert!(index.unique);

            if pos == 0 {
                assert_eq!(index.columns.len(), 1);
                assert_eq!(index.columns[0].name, "a");
            } else if pos == 1 {
                assert_eq!(index.columns.len(), 1);
                assert_eq!(index.columns[0].name, "b");
            } else if pos == 2 {
                assert_eq!(index.columns.len(), 2);
                assert_eq!(index.columns[0].name, "c");
                assert_eq!(index.columns[1].name, "d");
            }

            assert!(matches!(index.columns[0].order, SortOrder::Asc));
        }

        Ok(())
    }

    #[test]
    fn test_automatic_index_unique_set_dedup() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a, b, UNIQUE(a, b), UNIQUE(a, b));"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let index = Index::automatic_from_unique(
            &table,
            ("sqlite_autoindex_t1_1".to_string(), 2),
            vec![(0, SortOrder::Asc), (1, SortOrder::Asc)],
        )?;

        assert_eq!(index.name, "sqlite_autoindex_t1_1");
        assert_eq!(index.table_name, "t1");
        assert_eq!(index.root_page, 2);
        assert!(index.unique);
        assert_eq!(index.columns.len(), 2);
        assert_eq!(index.columns[0].name, "a");
        assert!(matches!(index.columns[0].order, SortOrder::Asc));
        assert_eq!(index.columns[1].name, "b");
        assert!(matches!(index.columns[1].order, SortOrder::Asc));

        Ok(())
    }

    #[test]
    fn test_automatic_index_primary_key_is_unique() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a primary key unique);"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let index =
            Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 1)?;

        assert_eq!(index.name, "sqlite_autoindex_t1_1");
        assert_eq!(index.table_name, "t1");
        assert_eq!(index.root_page, 2);
        assert!(index.unique);
        assert_eq!(index.columns.len(), 1);
        assert_eq!(index.columns[0].name, "a");
        assert!(matches!(index.columns[0].order, SortOrder::Asc));

        Ok(())
    }

    #[test]
    fn test_automatic_index_primary_key_is_unique_and_composite() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a, b, PRIMARY KEY(a, b), UNIQUE(a, b));"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let index =
            Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_1".to_string(), 2), 2)?;

        assert_eq!(index.name, "sqlite_autoindex_t1_1");
        assert_eq!(index.table_name, "t1");
        assert_eq!(index.root_page, 2);
        assert!(index.unique);
        assert_eq!(index.columns.len(), 2);
        assert_eq!(index.columns[0].name, "a");
        assert_eq!(index.columns[1].name, "b");
        assert!(matches!(index.columns[0].order, SortOrder::Asc));

        Ok(())
    }

    #[test]
    fn test_automatic_index_unique_and_a_pk() -> Result<()> {
        let sql = r#"CREATE TABLE t1 (a NUMERIC UNIQUE UNIQUE,  b TEXT PRIMARY KEY)"#;
        let table = BTreeTable::from_sql(sql, 0)?;
        let mut indexes = vec![
            Index::automatic_from_unique(
                &table,
                ("sqlite_autoindex_t1_1".to_string(), 2),
                vec![(0, SortOrder::Asc)],
            )?,
            Index::automatic_from_primary_key(&table, ("sqlite_autoindex_t1_2".to_string(), 3), 1)?,
        ];

        assert!(indexes.len() == 2);
        let index = indexes.pop().unwrap();
        assert_eq!(index.name, "sqlite_autoindex_t1_2");
        assert_eq!(index.table_name, "t1");
        assert_eq!(index.root_page, 3);
        assert!(index.unique);
        assert_eq!(index.columns.len(), 1);
        assert_eq!(index.columns[0].name, "b");
        assert!(matches!(index.columns[0].order, SortOrder::Asc));

        let index = indexes.pop().unwrap();
        assert_eq!(index.name, "sqlite_autoindex_t1_1");
        assert_eq!(index.table_name, "t1");
        assert_eq!(index.root_page, 2);
        assert!(index.unique);
        assert_eq!(index.columns.len(), 1);
        assert_eq!(index.columns[0].name, "a");
        assert!(matches!(index.columns[0].order, SortOrder::Asc));

        Ok(())
    }
}
